# Example Delta Lake

## Import Libraries

In [21]:
import os
from pyspark.sql import SparkSession, DataFrame, functions as F
from delta import *

## Build a Spark Session with Delta

In [22]:
builder = (
    SparkSession.builder.appName("Delta-App")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
    )
    .config("spark.databricks.delta.properties.defaults.enableChangeDataFeed","true")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
WORK_DIR = f'{os.getenv("HOME")}/work'

## Create Functions

In [23]:
def create_cols_partition_YMD(df: DataFrame, col_name: str) -> DataFrame:
    return (
        df.withColumn("year", F.split(F.col(col_name), "/").getItem(0))
        .withColumn("month", F.split(F.col(col_name), "/").getItem(1))
        .withColumn("day", F.split(F.col(col_name), "/").getItem(2))
    )

## First Time

### Read a Raw Data

In [24]:
datalake = f'{WORK_DIR}/datalake'
raw = f'{datalake}/raw'
trusted = f'{datalake}/trusted'

In [25]:
df = spark.read.parquet(f'{raw}/data').where('year = 2023 and month = 02 and day = 04')

In [26]:
df = create_cols_partition_YMD(df.drop(*['year','month','day']),'created')

In [27]:
df.toPandas()

Unnamed: 0,created,id,updated,value,year,month,day
0,2023/02/03,0,2023/02/04,morango,2023,02,03
1,2023/02/03,2,2023/02/04,morango,2023,02,03
2,2023/02/03,3,2023/02/04,limão,2023,02,03
3,2023/02/03,4,2023/02/04,banana,2023,02,03
4,2023/02/03,5,2023/02/04,morango,2023,02,03
...,...,...,...,...,...,...,...
3148,2023/02/03,4990,2023/02/04,abacaxi,2023,02,03
3149,2023/02/03,4991,2023/02/04,maça,2023,02,03
3150,2023/02/03,4992,2023/02/04,banana,2023,02,03
3151,2023/02/03,4993,2023/02/04,limão,2023,02,03


Looking results it's possible observe a sample of data and quantity rows to be insert in destination.

### Write

In [28]:
(
    df.write.partitionBy("year", "month", "day")
    .format("delta")
    .mode("overwrite")
    .save(f"{trusted}/delta/")
)

This code insert data in the first time from destination, it's important to observe the property format equal to <b> delta </b> this property determines that your table is a <b> delta table</b>.

## Upsert 

In [29]:
delta_destination = DeltaTable.forPath(spark, f'{trusted}/delta/')

In [30]:
df = spark.read.parquet(f"{raw}/data").where("year = 2023 and month = 02 and day = 05")
df = create_cols_partition_YMD(df.drop(*["year", "month", "day"]), "created")

In [31]:
df.toPandas()

Unnamed: 0,created,id,updated,value,year,month,day
0,2023/02/03,0,2023/02/05,banana,2023,02,03
1,2023/02/03,1,2023/02/05,morango,2023,02,03
2,2023/02/03,8,2023/02/05,limão,2023,02,03
3,2023/02/03,9,2023/02/05,maça,2023,02,03
4,2023/02/03,10,2023/02/05,maça,2023,02,03
...,...,...,...,...,...,...,...
3185,2023/02/03,4994,2023/02/05,abacaxi,2023,02,03
3186,2023/02/03,4996,2023/02/05,maça,2023,02,03
3187,2023/02/03,4997,2023/02/05,banana,2023,02,03
3188,2023/02/03,4998,2023/02/05,morango,2023,02,03


In [32]:
(
    delta_destination.alias("destination")
    .merge(df.alias("source"), "destination.id = source.id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

This is a simple code for to do upsert operation, the method merge start operation and for you especify what do you want insert or update you declare methods <b> whenMatchedUpdate</b> and <b> whenNotMatchedInsert</b> in my case i used all for both cases because I don't wanna to especify columns to make change.  

## Read Trusted with Delta

In [33]:
df_destination = DeltaTable.forPath(spark, f'{trusted}/delta/').toDF()

In [34]:
df_destination.toPandas()

Unnamed: 0,created,id,updated,value,year,month,day
0,2023/02/03,0,2023/02/05,banana,2023,02,03
1,2023/02/03,1,2023/02/05,morango,2023,02,03
2,2023/02/03,2,2023/02/04,morango,2023,02,03
3,2023/02/03,3,2023/02/04,limão,2023,02,03
4,2023/02/03,4,2023/02/04,banana,2023,02,03
...,...,...,...,...,...,...,...
4308,2023/02/03,4995,2023/02/04,maça,2023,02,03
4309,2023/02/03,4996,2023/02/05,maça,2023,02,03
4310,2023/02/03,4997,2023/02/05,banana,2023,02,03
4311,2023/02/03,4998,2023/02/05,morango,2023,02,03


## Comparing Results

In [35]:
df_destination.groupby('updated').count().toPandas()

Unnamed: 0,updated,count
0,2023/02/04,1123
1,2023/02/05,3190


It's possible see above that changes in day 03 correspond to total because the partition destination it's a "created". 

## Incremental Data

In [52]:
df_describe = spark.sql(f"DESCRIBE HISTORY '{f'{trusted}/delta/'}'")

In [59]:
df_describe.toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,1,2023-02-05 23:34:39.697,,,MERGE,"{'matchedPredicates': '[{""actionType"":""update""...",,,,0.0,Serializable,False,"{'numOutputRows': '4313', 'numTargetRowsInsert...",,Apache-Spark/3.3.1 Delta-Lake/2.2.0
1,0,2023-02-05 23:34:37.733,,,WRITE,"{'mode': 'Overwrite', 'partitionBy': '[""year"",...",,,,,Serializable,False,"{'numOutputRows': '3153', 'numOutputBytes': '1...",,Apache-Spark/3.3.1 Delta-Lake/2.2.0


In [57]:
version = df_describe.select(F.max(F.col('version'))).collect()[0][0]

In [36]:
df_increment = (spark.read.format("delta")
                .option("readChangeFeed", "true")
                .option("startingVersion", version)
                .load(f'{trusted}/delta/')
                .filter("_change_type != 'update_preimage'")
               )

In [37]:
df_increment.groupby('_change_type').count().toPandas()

Unnamed: 0,_change_type,count
0,update_postimage,2030
1,insert,1160


In [38]:
df_increment.toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,created,id,updated,value,year,month,day,_change_type,_commit_version,_commit_timestamp
0,2023/02/03,0,2023/02/05,banana,2023,02,03,update_postimage,1,2023-02-05 23:34:39.697
1,2023/02/03,1,2023/02/05,morango,2023,02,03,insert,1,2023-02-05 23:34:39.697
2,2023/02/03,8,2023/02/05,limão,2023,02,03,update_postimage,1,2023-02-05 23:34:39.697
3,2023/02/03,9,2023/02/05,maça,2023,02,03,update_postimage,1,2023-02-05 23:34:39.697
4,2023/02/03,10,2023/02/05,maça,2023,02,03,insert,1,2023-02-05 23:34:39.697
...,...,...,...,...,...,...,...,...,...,...
3185,2023/02/03,4994,2023/02/05,abacaxi,2023,02,03,insert,1,2023-02-05 23:34:39.697
3186,2023/02/03,4996,2023/02/05,maça,2023,02,03,insert,1,2023-02-05 23:34:39.697
3187,2023/02/03,4997,2023/02/05,banana,2023,02,03,insert,1,2023-02-05 23:34:39.697
3188,2023/02/03,4998,2023/02/05,morango,2023,02,03,insert,1,2023-02-05 23:34:39.697


When activate enable data change it's possible to observe that the columns <b>"_change_type, _commit_version, _commit_timestamp"</b> are included with these columns it is possible to make an incremental query.

## References

https://docs.delta.io/latest/index.html

https://jupyter-docker-stacks.readthedocs.io/en/latest/using/recipes.html#enable-delta-lake-in-spark-notebooks