# Delta Lake fundamentals

This notebook was built to be run on the following docker image: `jupyter/pyspark-notebook:spark-3.3.1`

https://towardsdatascience.com/hands-on-introduction-to-delta-lake-with-py-spark-b39460a4b1ae

https://github.com/jaumpedro214/posts/tree/main


## Connect to Spark

In [1]:
# !pip install pyspark
# !pip install delta-spark

In [11]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
# import pyspark.sql.functions as F

import pyspark
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
from delta import *
from pyspark.sql import functions as F

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Delta Lake fundamentals

### 1. Create a Delta Table

Let's load the data

In [16]:
SCHEMA = StructType(
    [
        StructField('id', StringType(), True), 
        StructField('data_inversa', StringType(), True), 
        StructField('dia_semana', StringType(), True), 
        StructField('horario', StringType(), True), 
        StructField('uf', StringType(), True), 
        StructField('br', StringType(), True), 
        StructField('km', StringType(), True), 
        StructField('municipio', StringType(), True), 
        StructField('causa_acidente', StringType(), True), 
        StructField('tipo_acidente', StringType(), True), 
        StructField('classificacao_acidente', StringType(), True), 
        StructField('fase_dia', StringType(), True), 
        StructField('sentido_via', StringType(), True), 
        StructField('condicao_metereologica', StringType(), True), 
        StructField('tipo_pista', StringType(), True), 
        StructField('tracado_via', StringType(), True), 
        StructField('uso_solo', StringType(), True), 
        StructField('pessoas', IntegerType(), True), 
        StructField('mortos', IntegerType(), True), 
        StructField('feridos_leves', IntegerType(), True), 
        StructField('feridos_graves', IntegerType(), True), 
        StructField('ilesos', IntegerType(), True), 
        StructField('ignorados', IntegerType(), True), 
        StructField('feridos', IntegerType(), True), 
        StructField('veiculos', StringType(), True), 
        StructField('latitude', DoubleType(), True), 
        StructField('longitude', DoubleType(), True), 
        StructField('regional', StringType(), True), 
        StructField('delegacia', StringType(), True), 
        StructField('uop', StringType(), True)
    ]
)

In [112]:
df_acidentes = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("data/acidentes/datatran2020.csv")
)

df_acidentes.show(5)

+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+---------+-----------+----------------------+----------+-----------+--------+-------+------+-------------+--------------+------+---------+-------+--------+--------+---------+--------+---------+--------------+
|    id|data_inversa|  dia_semana| horario| uf| br|   km|           municipio|      causa_acidente|       tipo_acidente|classificacao_acidente| fase_dia|sentido_via|condicao_metereologica|tipo_pista|tracado_via|uso_solo|pessoas|mortos|feridos_leves|feridos_graves|ilesos|ignorados|feridos|veiculos|latitude|longitude|regional|delegacia|           uop|
+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+---------+-----------+----------------------+----------+-----------+--------+-------+------+-------------+--------------+------+---------+-------+-------

Write a Delta Table is simple

In [115]:
df_acidentes\
    .write.format("delta")\
    .mode("overwrite")\
    .save("data_lake/delta/acidentes/")

_Note: If you are having trouble in writing the Delta Table, remember to check the permissions of the folder, including the newly created._

### 2. Read from a Delta Table

In [119]:
df_acidentes_delta = (
    spark
    .read.format("delta")
    .load("data_lake/delta/acidentes/")
)

And we can execute queries as usual

In [122]:
df_acidentes_delta.select(["id", "data_inversa", "dia_semana", "horario", "uf"]).show(5)

+------+------------+------------+--------+---+
|    id|data_inversa|  dia_semana| horario| uf|
+------+------------+------------+--------+---+
|260068|  2020-01-01|quarta-feira|05:40:00| PA|
|260073|  2020-01-01|quarta-feira|06:00:00| MG|
|260087|  2020-01-01|quarta-feira|06:00:00| BA|
|260116|  2020-01-01|quarta-feira|10:08:00| SP|
|260129|  2020-01-01|quarta-feira|12:10:00| MG|
+------+------------+------------+--------+---+
only showing top 5 rows



In [124]:
df_acidentes_delta.count()

63585

### 3. Add new data to the Delta Table

Adding new data is just a matter of appending the new data to the Delta Table.

In [128]:
df_acidentes_2019 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .schema(SCHEMA)
    .load("data/acidentes/datatran2019.csv")
)

In [130]:
df_acidentes_2019\
    .write.format("delta")\
    .mode("append")\
    .save("data_lake/delta/acidentes/")

Let's check the number of rows in the Delta Table

In [132]:
df_acidentes_delta.count()

131143

### 4. View the history (logs) of the Delta Table

The Log of the Delta Table is a record of all the operations that have been performed on the table. It contains a detailed description of each operation performed, including all the metadata about the operation.

To read the log, we can use a special python object called `DeltaTable`.

In [137]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "data_lake/delta/acidentes/")

delta_table.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|     10|2024-12-24 18:43:...|  NULL|    NULL|    WRITE|{mode -> Append, ...|NULL|    NULL|     NULL|          9|  Serializable|         true|{numFiles -> 5, n...|        NULL|Apache-Spark/3.5....|
|      9|2024-12-24 18:42:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|          8|  Serializable|        false|{numFiles -> 5, n...|        NULL|Apache-Spark/3.5....|
|      8|2

In [139]:
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation|operationParameters                                                                                                                                                                                                          |
+-------+-----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|10     |2024-12-24 18:43:10.595|WRITE    |{mode -> Append, partitionBy -> []}                                                                                                                            

### 5. Read a specific version of the Delta Table

If nothing is specified, the Spark will read the latest version of the Delta Table.

In [143]:
df_acidentes_latest = (
    spark
    .read.format("delta")
    .load("data_lake/delta/acidentes/")
)

df_acidentes_latest.count()

131143

But it's also possible to read a specific version

In [146]:
df_acidentes_version_0 = (
    spark
    .read.format("delta")
    .option("versionAsOf", 0)
    .load("data_lake/delta/acidentes/")
)

df_acidentes_version_0.count()

63585

As the version 0 was specified, it only contains the data from the first operation.

### 6. Revert to a previous version

This is another operation performed by the `DeltaTable` object. [Link](https://delta.io/blog/2022-10-03-rollback-delta-lake-restore/)

In [151]:
delta_table.restoreToVersion(0)

24/12/24 18:43:15 WARN DAGScheduler: Broadcasting large task binary with size 1079.8 KiB


DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

Now, the latest version doesn't contain the new data.

In [153]:
df_acidentes_latest.count()

63585

The **RESTORE** operation is also stored in the log.

In [155]:
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation|operationParameters                                                                                                                                                                                                          |
+-------+-----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|11     |2024-12-24 18:43:17.158|RESTORE  |{version -> 0, timestamp -> NULL}                                                                                                                              

So, in practice, no information is lost.

Let's restore back to the version with the data from 2020 and 2019.

In [82]:
delta_table.restoreToVersion(1)

24/12/24 17:55:38 WARN DAGScheduler: Broadcasting large task binary with size 1079.8 KiB


DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

### 7. Update 

The update operation is also done by the `DeltaTable` object. But we will use the `update` method with the SQL syntax.

First, let's write the data from 2016 to the delta table. This data contains the "data_inversa" columns wrongly formatted.

In [86]:
df_acidentes_2016 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("data/acidentes/datatran2016.csv")
)

df_acidentes_2016.select("data_inversa").show(5)

+------------+
|data_inversa|
+------------+
|    10/06/16|
|    01/01/16|
|    01/01/16|
|    01/01/16|
|    01/01/16|
+------------+
only showing top 5 rows



In [88]:
df_acidentes_2016\
    .write.format("delta")\
    .mode("append")\
    .save("data_lake/delta/acidentes/")

24/12/24 17:58:25 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 25, schema size: 30
CSV file: file:///Users/prabhakarapelluru/prabhakara/data_science/code/04-datascience-spark-python-2024/00-prabha-delta-lake-examples/01-delta-lake-fundamentals/data/acidentes/datatran2016.csv
                                                                                

In [90]:
df_acidentes_latest.count()

159948

In [92]:
df_acidentes_latest.createOrReplaceTempView("acidentes_latest")

spark.sql(
    """
    UPDATE acidentes_latest
    SET data_inversa = CAST( TO_DATE(data_inversa, 'dd/MM/yy') AS STRING)
    WHERE data_inversa LIKE '%/16'
    """
)

                                                                                

DataFrame[num_affected_rows: bigint]

In [94]:
df_acidentes_latest.filter( F.col("data_inversa").like("%/16") ).count()

0

### 8. Merge 

The merge operation, also known as upsert, is a mix of insert and update. It's also done by the `DeltaTable` object. But we will use the `merge` method with the SQL syntax.

To simulate the utility of this method, we'll simulate a scenario where we have a data source that is constantly updating the data with new counts.

In [99]:
df_acidentes_2018 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("data/acidentes/datatran2018.csv")
)

df_acidentes_2018_zero = df_acidentes_2018.withColumn("pessoas", F.lit(0))

In [101]:
df_acidentes_2018_zero\
    .write.format("delta")\
    .mode("append")\
    .save("data_lake/delta/acidentes/")

In [103]:
df_acidentes_latest.count()

229281

In [105]:
df_acidentes_latest.filter( F.col("data_inversa").like("2018-%") ).select(["data_inversa", "pessoas"]).show(5)

+------------+-------+
|data_inversa|pessoas|
+------------+-------+
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
+------------+-------+
only showing top 5 rows



Now, let's merge the data with the correct counts.

In [108]:
df_acidentes_latest.createOrReplaceTempView("acidentes_latest")
df_acidentes_2018.createOrReplaceTempView("acidentes_2018_new_counts")

spark.sql(
    """
    MERGE INTO acidentes_latest
    USING acidentes_2018_new_counts

    ON acidentes_latest.id = acidentes_2018_new_counts.id
    AND acidentes_latest.data_inversa = acidentes_2018_new_counts.data_inversa
    
    WHEN MATCHED THEN
        UPDATE SET pessoas = acidentes_latest.pessoas + acidentes_2018_new_counts.pessoas
    
    WHEN NOT MATCHED THEN
        INSERT *
    """
)

24/12/24 18:07:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [110]:
df_acidentes_latest.filter( F.col("data_inversa").like("2018-%") ).select(["data_inversa", "pessoas"]).show(5)

+------------+-------+
|data_inversa|pessoas|
+------------+-------+
|  2018-01-01|      2|
|  2018-01-01|      4|
|  2018-01-01|      2|
|  2018-01-01|      2|
|  2018-01-01|      1|
+------------+-------+
only showing top 5 rows

