# Delta Lake Tutorial


**Loading a csv to a spark df and then writing it as delta table**

In [0]:
# Reading a CSV file from the azure storage container
df = spark.read.option("header","true").csv("abfss://deltasource@rxdbstorageaccount.dfs.core.windows.net/customers.csv")
df.display()
# Writing the csv as a delta table to azure storage container
df.write.format("delta").mode("overwrite").save("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")

**Reading Delta Tables**

In [0]:
#Reading a delta table for the azure storage
df = spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df.display()

**Appending delta _tables_**

In [0]:
#Reading data from the existing delta table for appending
#df = spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
#Option2: Reading data from the csv file for appending
df = spark.read.option("header","true").csv("abfss://deltasource@rxdbstorageaccount.dfs.core.windows.net/customers.csv")
df.write.mode("append").format("delta").save("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")


In [0]:
#verifing the data append
df = spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df.display()

**Schema Enforcement & Schema evolution**

In [0]:
from pyspark.sql.functions import lit
# Adding an incompatible schema to check how delta lake handles
df = spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df = df.withColumn("new_column", lit("new_column"))
#try to append the incompatible schema
#df.write.mode("append").format("delta").save("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
#try schema evolution
df.write.mode("append").option("mergeSchema", "true").format("delta").save("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
#read the new table
df = spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df.display()


**Time Travel & History**

In [0]:
%sql
--- describe the history of delta file
DESCRIBE HISTORY 'abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/'

**Read previous versions**

In [0]:
# Read a specific version of Delta Table and display it
df_v1 = spark.read.format("delta").option("versionAsOf", 3).load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df_v1.display()

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
#Load delta table as DeltaTable object.
delta_table = DeltaTable.forPath(spark, "abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
delta_table.restoreToVersion(0)
display(delta_table.toDF())

In [0]:
#verify the data by reading
df = spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df.display()

**Merge operations - update and Insert operation**

In [0]:
from delta.tables import DeltaTable
# Load csv file with updates
update_df=spark.read.option("header","true").csv("abfss://deltasource@rxdbstorageaccount.dfs.core.windows.net/updates.csv")
# Droping column to match the delta table schema
update_df=update_df.drop("new_column")
update_df.display()
#Load and display delta table
delta_table=DeltaTable.forPath(spark, "abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
display(delta_table.toDF())
#Performing merge operations
delta_table.alias("target").merge(source=update_df.alias("source"), condition="target.customer_id = source.customer_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
#Reading data after nerge operation
df=spark.read.format("delta").load("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
df.display()


**delete and update operarion**

In [0]:
delta_table = DeltaTable.forPath(spark, "abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
#delta_table.history().display()
display(delta_table.toDF())
delta_table.delete("last_name == 'Clark'")
display(delta_table.toDF())
delta_table.update(condition="country== 'IN'", set={"country":"'NZ'"})
display(delta_table.toDF())
new_record = spark.createDataFrame([("101", "Sarah", "Wilson", "sarah.wilson@example.com", "UK")], ["customer_id", "first_name", "last_name", "email", "country"])
new_record.write.format("delta").mode("append").save("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
display(delta_table.toDF())

### Performance and maintainance

**Optimize**

In [0]:
display(delta_table.toDF())
delta_table.optimize().executeCompaction()
display(delta_table.toDF())


**vacuum**

In [0]:
delta_table.vacuum(retentionHours=200)
display(delta_table.toDF())

**Enable Change Data Feed**

In [0]:
delta_path = "abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/"

spark.sql(f"""
          ALTER TABLE delta.`{delta_path}`
          SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
          """)

**Query change feed**

In [0]:
new_record = spark.createDataFrame([
    ("101", "Sarah", "Wilson", "sarah.wilson@example.com", "UK"),
    ("102", "John", "Doe", "john.doe@example.com", "USA"),
    ("103", "Emily", "Chen", "emily.chen@example.com", "Canada"),
    ("104", "Raj", "Kumar", "raj.kumar@example.com", "India"),
    ("105", "Anna", "Garcia", "anna.garcia@example.com", "Spain")
], ["customer_id", "first_name", "last_name", "email", "country"])

new_record.write.format("delta").mode("append").save("abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/")
delta_path = "abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/"
change_df = spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion",25).load(delta_path)
change_df.display()

In [0]:
%sql
DESCRIBE HISTORY delta.`abfss://deltastaging@rxdbstorageaccount.dfs.core.windows.net/`
    
--- read the delta file

### Partitioning

**partitionBy**

In [0]:
df = spark.read.option("header","true").csv("abfss://deltasource@rxdbstorageaccount.dfs.core.windows.net/customers_evolve.csv")
df.display()
df.write \
  .format("delta") \
  .mode("overwrite") \
  .partitionBy("country") \
  .save("abfss://destinationparti@rxdbstorageaccount.dfs.core.windows.net/partitions/delta_table_part")
df = spark.read.format("delta").load("abfss://destinationparti@rxdbstorageaccount.dfs.core.windows.net/partitions/delta_table_part")
df.display()

In [0]:
df = spark.read.format("delta").load("abfss://destinationparti@rxdbstorageaccount.dfs.core.windows.net/partitions/delta_table_part")
df.display()
