# Delta Lake fundamentals

This notebook was built to be run on the following docker image: `jupyter/pyspark-notebook:spark-3.3.1`

## Connect to Spark

In [1]:
!pip install delta-spark



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
import pyspark.sql.functions as F

from delta.pip_utils import configure_spark_with_delta_pip

spark = (
    SparkSession
    .builder.master("spark://spark:7077")
    .appName("DeltaLakeFundamentals")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(spark).getOrCreate()

## Delta Lake fundamentals

### 1. Create a Delta Table

Let's load the data

In [2]:
SCHEMA = StructType(
    [
        StructField('id', StringType(), True), 
        StructField('data_inversa', StringType(), True), 
        StructField('dia_semana', StringType(), True), 
        StructField('horario', StringType(), True), 
        StructField('uf', StringType(), True), 
        StructField('br', StringType(), True), 
        StructField('km', StringType(), True), 
        StructField('municipio', StringType(), True), 
        StructField('causa_acidente', StringType(), True), 
        StructField('tipo_acidente', StringType(), True), 
        StructField('classificacao_acidente', StringType(), True), 
        StructField('fase_dia', StringType(), True), 
        StructField('sentido_via', StringType(), True), 
        StructField('condicao_metereologica', StringType(), True), 
        StructField('tipo_pista', StringType(), True), 
        StructField('tracado_via', StringType(), True), 
        StructField('uso_solo', StringType(), True), 
        StructField('pessoas', IntegerType(), True), 
        StructField('mortos', IntegerType(), True), 
        StructField('feridos_leves', IntegerType(), True), 
        StructField('feridos_graves', IntegerType(), True), 
        StructField('ilesos', IntegerType(), True), 
        StructField('ignorados', IntegerType(), True), 
        StructField('feridos', IntegerType(), True), 
        StructField('veiculos', StringType(), True), 
        StructField('latitude', DoubleType(), True), 
        StructField('longitude', DoubleType(), True), 
        StructField('regional', StringType(), True), 
        StructField('delegacia', StringType(), True), 
        StructField('uop', StringType(), True)
    ]
)

In [16]:
df_acidentes = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("/data/acidentes/datatran2020.csv")
)

df_acidentes.show(5)

+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+---------+-----------+----------------------+----------+-----------+--------+-------+------+-------------+--------------+------+---------+-------+--------+------------+------------+--------+---------+--------------+
|    id|data_inversa|  dia_semana| horario| uf| br|   km|           municipio|      causa_acidente|       tipo_acidente|classificacao_acidente| fase_dia|sentido_via|condicao_metereologica|tipo_pista|tracado_via|uso_solo|pessoas|mortos|feridos_leves|feridos_graves|ilesos|ignorados|feridos|veiculos|    latitude|   longitude|regional|delegacia|           uop|
+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+---------+-----------+----------------------+----------+-----------+--------+-------+------+-------------+--------------+------+---------+-

Write a Delta Table is simple

In [19]:
df_acidentes\
    .write.format("delta")\
    .mode("overwrite")\
    .save("/data/delta/acidentes/")

_Note: If you are having trouble in writing the Delta Table, remember to check the permissions of the folder, including the newly created._

### 2. Read from a Delta Table

In [20]:
df_acidentes_delta = (
    spark
    .read.format("delta")
    .load("/data/delta/acidentes/")
)

And we can execute queries as usual

In [21]:
df_acidentes_delta.select(["id", "data_inversa", "dia_semana", "horario", "uf"]).show(5)

+------+------------+----------+--------+---+
|    id|data_inversa|dia_semana| horario| uf|
+------+------------+----------+--------+---+
|263804|  2020-01-19|   domingo|12:30:00| SC|
|263806|  2020-01-19|   domingo|14:50:00| SC|
|263807|  2020-01-19|   domingo|14:00:00| RJ|
|263809|  2020-01-19|   domingo|15:30:00| RS|
|263812|  2020-01-19|   domingo|15:15:00| GO|
+------+------------+----------+--------+---+
only showing top 5 rows



In [23]:
df_acidentes_delta.count()

63576

### 3. Add new data to the Delta Table

Adding new data is just a matter of appending the new data to the Delta Table.

In [24]:
df_acidentes_2019 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .schema(SCHEMA)
    .load("/data/acidentes/datatran2019.csv")
)

In [25]:
df_acidentes_2019\
    .write.format("delta")\
    .mode("append")\
    .save("/data/delta/acidentes/")

Let's check the number of rows in the Delta Table

In [26]:
df_acidentes_delta.count()

131132

### 4. View the history (logs) of the Delta Table

The Log of the Delta Table is a record of all the operations that have been performed on the table. It contains a detailed description of each operation performed, including all the metadata about the operation.

To read the log, we can use a special python object called `DeltaTable`.

In [29]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/data/delta/acidentes/")

delta_table.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      1|2023-02-14 01:12:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          0|  Serializable|         true|{numFiles -> 4, n...|        null|Apache-Spark/3.3....|
|      0|2023-02-14 01:11:...|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|  Serializable|        false|{numFiles -> 4, n...|        null|Apache-Spark/3.3....|
+-------+-

In [33]:
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+--------------------------------------+
|version|timestamp              |operation|operationParameters                   |
+-------+-----------------------+---------+--------------------------------------+
|1      |2023-02-14 01:12:04.86 |WRITE    |{mode -> Append, partitionBy -> []}   |
|0      |2023-02-14 01:11:28.625|WRITE    |{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+



### 5. Read a specific version of the Delta Table

If nothing is specified, the Spark will read the latest version of the Delta Table.

In [34]:
df_acidentes_latest = (
    spark
    .read.format("delta")
    .load("/data/delta/acidentes/")
)

df_acidentes_latest.count()

131132

But it's also possible to read a specific version

In [36]:
df_acidentes_version_0 = (
    spark
    .read.format("delta")
    .option("versionAsOf", 0)
    .load("/data/delta/acidentes/")
)

df_acidentes_version_0.count()

63576

As the version 0 was specified, it only contains the data from the first operation.

### 6. Revert to a previous version

This is another operation performed by the `DeltaTable` object. [Link](https://delta.io/blog/2022-10-03-rollback-delta-lake-restore/)

In [37]:
delta_table.restoreToVersion(0)

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

Now, the latest version doesn't contain the new data.

In [38]:
df_acidentes_latest.count()

63576

The **RESTORE** operation is also stored in the log.

In [39]:
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+--------------------------------------+
|version|timestamp              |operation|operationParameters                   |
+-------+-----------------------+---------+--------------------------------------+
|2      |2023-02-14 01:33:27.373|RESTORE  |{version -> 0, timestamp -> null}     |
|1      |2023-02-14 01:12:04.86 |WRITE    |{mode -> Append, partitionBy -> []}   |
|0      |2023-02-14 01:11:28.625|WRITE    |{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+



So, in practice, no information is lost.

Let's restore back to the version with the data from 2020 and 2019.

In [40]:
delta_table.restoreToVersion(1)

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

### 7. Update 

The update operation is also done by the `DeltaTable` object. But we will use the `update` method with the SQL syntax.

First, let's write the data from 2016 to the delta table. This data contains the "data_inversa" columns wrongly formatted.

In [43]:
df_acidentes_2016 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("/data/acidentes/datatran2016.csv")
)

df_acidentes_2016.select("data_inversa").show(5)

+------------+
|data_inversa|
+------------+
|    10/06/16|
|    01/01/16|
|    01/01/16|
|    01/01/16|
|    01/01/16|
+------------+
only showing top 5 rows



In [44]:
df_acidentes_2016\
    .write.format("delta")\
    .mode("append")\
    .save("/data/delta/acidentes/")

In [50]:
df_acidentes_latest.count()

227495

In [63]:
df_acidentes_latest.createOrReplaceTempView("acidentes_latest")

spark.sql(
    """
    UPDATE acidentes_latest
    SET data_inversa = CAST( TO_DATE(data_inversa, 'dd/MM/yy') AS STRING)
    WHERE data_inversa LIKE '%/16'
    """
)

DataFrame[num_affected_rows: bigint]

In [66]:
df_acidentes_latest.filter( F.col("data_inversa").like("%/16") ).count()

0

### 8. Merge 

The merge operation, also known as upsert, is a mix of insert and update. It's also done by the `DeltaTable` object. But we will use the `merge` method with the SQL syntax.

To simulate the utility of this method, we'll simulate a scenario where we have a data source that is constantly updating the data with new counts.

In [72]:
df_acidentes_2018 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("/data/acidentes/datatran2018.csv")
)

df_acidentes_2018_zero = df_acidentes_2018.withColumn("pessoas", F.lit(0))

In [73]:
df_acidentes_2018_zero\
    .write.format("delta")\
    .mode("append")\
    .save("/data/delta/acidentes/")

In [74]:
df_acidentes_latest.count()

296827

In [77]:
df_acidentes_latest.filter( F.col("data_inversa").like("2018-%") ).select(["data_inversa", "pessoas"]).show(5)

+------------+-------+
|data_inversa|pessoas|
+------------+-------+
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
+------------+-------+
only showing top 5 rows



Now, let's merge the data with the correct counts.

In [79]:
df_acidentes_latest.createOrReplaceTempView("acidentes_latest")
df_acidentes_2018.createOrReplaceTempView("acidentes_2018_new_counts")

spark.sql(
    """
    MERGE INTO acidentes_latest
    USING acidentes_2018_new_counts

    ON acidentes_latest.id = acidentes_2018_new_counts.id
    AND acidentes_latest.data_inversa = acidentes_2018_new_counts.data_inversa
    
    WHEN MATCHED THEN
        UPDATE SET pessoas = acidentes_latest.pessoas + acidentes_2018_new_counts.pessoas
    
    WHEN NOT MATCHED THEN
        INSERT *
    """
)

In [None]:
df_acidentes_latest.filter( F.col("data_inversa").like("2018-%") ).select(["data_inversa", "pessoas"]).show(5)