* Master DAC - BDLE
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr
* October 2023

## Outline

Documentation

* Delta https://docs.delta.io/0.4.0/delta-intro.html
* API Reference https://docs.delta.io/0.4.0/api/python/index.html

Organization
* Demo1 ->3 and additional material: illustrate concepts of the lecture
* Use cases: the exercice(s) to solve








## Prerequisite

In [1]:
!pip install --upgrade -q pyspark

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
delta-spark 2.4.0 requires pyspark<3.5.0,>=3.4.0, but you have pyspark 3.5.0 which is incompatible.[0m[31m
[0m

In [2]:
!pip install --upgrade -q delta-spark

In [3]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

local = "local[*]"
appName = "DeltaLake"
localConfig = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").\
  set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").\
  set("spark.jars.packages","io.delta:delta-core_2.12:2.4.0").\
  set("spark.databricks.delta.schema.autoMerge.enabled","true")


spark = SparkSession.builder.config(conf = localConfig).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

### imports

In [4]:
from delta.tables import *
from pyspark.sql.functions import *


## Demo1

### load the data into delta

In [5]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

AnalysisException: ignored

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

### update the data
#### overwrite

In [None]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

#### conditional overwrite

In [None]:
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })


In [None]:
deltaTable.toDF().show()

In [None]:
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

deltaTable.toDF().show()

In [None]:
# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

### view history

In [None]:
history = deltaTable.history()
history.printSchema()
history.show()

In [None]:
history.select("version","operation","operationMetrics").show(truncate=False)

In [None]:
!ls -al /tmp/delta-table/_delta_log

In [None]:
!cat /tmp/delta-table/_delta_log/00000000000000000000.json

In [None]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

In [None]:
df = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta-table")
df.show()

## Datagen


### Persons

In [None]:
# create sample data
data = [("12345", "Alice", 25, "123 Main St"),
        ("67890", "Bob", 30, "456 Oak Ave"),
        ("24680", "Charlie", 35, "789 Elm St")]

# create a DataFrame from the sample data
df = spark.createDataFrame(data, ["serial", "name", "age", "address"])

# write the DataFrame to Delta format
df.write.format("delta").save("/tmp/persons")

In [None]:
newdata = [("78120", "Dan", 42, "432 Holly Rd"),
        ("97362", "Lorry", 40, "290 Wise Ave")]


# create a DataFrame
newPersons = spark.createDataFrame(newdata, ["serial", "name", "age", "address"])

### Salaries

In [None]:
salaries = [("12345", 45000),
        ("67890", 52000),
        ("24680", 36000),
        ("78120", 60000),
        ("97362",38000)]

# create a DataFrame from the sample data
df = spark.createDataFrame(salaries, ["serial", "salary"])

# write the DataFrame to Delta format
df.write.format("delta").save("/tmp/salaries")

In [None]:
new_salaries = [("12345", 47000),
        ("67890", 50000),
        ("24680", 46000),
        ("78120", 61000),
        ("97362",39000)]

# create a DataFrame
newSalaries = spark.createDataFrame(new_salaries, ["serial", "salary"])

### Sales

In [None]:
sales = [("CHA_2",2,60),("BED_4",1,300),("SHO_15",2,60)]

# create a DataFrame from the sample data
df = spark.createDataFrame(sales, ["product_id", "quantity", "totalprice"])
df.write.format("delta").save("/tmp/sales")

In [None]:
new_sales = [("SHO_15",3,90),("CHA_2",1,30),("BED_6",1,200)]

newSales = spark.createDataFrame(new_sales, ["product_id", "quantity", "totalprice"])

### Products

In [None]:
products_list = [("CHA_2","Furniture","blue"),("BED_4","Furniture","brown"),("SHO_15","Cloth","black")]
products = spark.createDataFrame(products_list, ["product_id", "category", "color"])

## Demo2

### Q1. Adding new tuples
Consider the Delta table `person` with the following columns: serial, name, age, and address. You have a new dataset `newPersons` with the same columns, but with additional records. Write a merge statement to update the Delta table with the new records.


In [None]:

#load the persons table
delta_persons = DeltaTable.forPath(spark, "/tmp/persons")

# Define the merge condition
merge_condition = "target.serial = source.serial"

# Define the merge statement
delta_persons.alias("target").merge(
    newPersons.alias("source"), merge_condition
).whenNotMatchedInsertAll().execute()



#### verification

In [None]:
delta_persons.toDF().show()

In [None]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

### Q2: updating existing tuples
Assume you have a Delta table `salaries` with columns serial and salary. You want to update the salary of the employees who earn less than 50,000. You have a new dataset, `newSalaries` with the same columns but with updated salary information. Write a merge statement to update the `salaries` table with the new salary information.


In [None]:
#load the persons table
delta_salaries = DeltaTable.forPath(spark, "/tmp/salaries")

# Define
merge_condition = "target.serial = source.serial and target.salary<50000"
update_expression = { "salary": "source.salary" }

# merge statement
delta_salaries.alias("target") \
  .merge(
    newSalaries.alias("source"), merge_condition ) \
  .whenMatchedUpdate(set = update_expression) \
  .execute()


#### verification

In [None]:
delta_salaries.toDF().show()


In [None]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

### Q3: adding new tuples and updating existing ones
You have a Delta table `sales` with columns `product_id`, `quantity`, and `totalprice`. Write a merge statement to insert the new products from a dataframe `newSales` into `sales` and to make sure that, for existing products `sales` has the sum of the quantity and totalprice.


In [None]:
#load the sales table
delta_sales = DeltaTable.forPath(spark, "/tmp/sales")

In [None]:
delta_sales.toDF().show()

In [None]:
newSales.show()

In [None]:
# Define
merge_condition = "target.product_id = source.product_id"
update_expression = { "quantity": "target.quantity+source.quantity",  "totalprice": "target.totalprice+source.totalprice"}

# merge statement
delta_sales.alias("target") \
  .merge(
    newSales.alias("source"), merge_condition ) \
  .whenMatchedUpdate(set = update_expression) \
  .whenNotMatchedInsertAll()\
  .execute()

#### verification

In [None]:
delta_sales.toDF().show()

In [None]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

### Q4: Merge tables with different schemas
You have a Delta table `sales` with a column `product_id`, among other.  Write a merge statement to update `sales` with  information about products using a dataset `productInfo` which contains the columns `product_id`, `category` and `color`, when available.

In [None]:
products.createOrReplaceTempView("products")

spark.sql("""MERGE INTO delta.`/tmp/sales` t
USING products s
ON t.product_id = s.product_id
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *""")

In [None]:
spark.sql("""select * from  delta.`/tmp/sales` """).show()

#### verification

In [None]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

## Demo 3: Attaching constraints

### Not-null constraint

In [None]:
spark.sql("""
CREATE TABLE default.persons (
    serial INT NOT NULL,
    name STRING,
    birthDate TIMESTAMP,
    address STRING
  ) USING DELTA;

""")

In [None]:
spark.sql(""" DESCRIBE DETAIL default.persons """).show()

In [None]:
!ls /content/spark-warehouse/persons

In [None]:
spark.sql("""select * from default.persons """).show()


In [None]:
spark.sql("""insert into default.persons values (12345, "Alice","2000-02-01" ,"123 Main St") """)

In [None]:
spark.sql("""select * from default.persons """).show()


[texte du lien](https://)Can we run the following statement?

no there is a constraint on serial that must be an int not null

In [None]:
#spark.sql("""insert into default.persons values (null, "Bob","1996-03-14" ,"456 Oak Ave") """)


### Predicate constraint

In [None]:
spark.sql(""" ALTER TABLE default.persons ADD CONSTRAINT birthdate CHECK (birthDate > '2000-01-01'); """)

In [None]:
spark.sql("""SHOW TBLPROPERTIES default.persons""").show(truncate=False)

Can we run the following statement?

In [None]:
spark.sql("""insert into default.persons values (47962, "Bob","2003-03-14" ,"456 Oak Ave") """)

In [None]:
#spark.sql("""insert into default.persons values (47962, "Bob","1999-03-14" ,"456 Oak Ave") """)

## Use case 1

### Data import

In [None]:
! wget https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar -O /tmp/sales.tar

In [None]:
!mkdir /tmp/delta

In [None]:
! tar xvf /tmp/sales.tar -C /tmp/delta

In [None]:
!ls /tmp/delta/sales

In [None]:
# original
orginal_sales = spark.read.csv("/tmp/delta/sales/salesOriginal.csv", header=True, inferSchema=True)
orginal_sales.write.format("delta").partitionBy("category").save("/tmp/delta/deltaSales")

In [None]:
print("count: %d \n schema: " % orginal_sales.count())
orginal_sales.dtypes

In [None]:
# march 2023 sales
march23_sales = spark.read.csv("/tmp/delta/sales/march23_sales.csv", header=True, inferSchema=True)
print("count: %d \n schema: " % march23_sales.count())
march23_sales.dtypes

### Load the delta table

In [None]:
deltaSales = DeltaTable.forPath(spark, "/tmp/delta/deltaSales")

In [None]:
deltaSales.detail().show()

In [None]:
deltaSales.toDF().show()

In [None]:
march23_sales.show()

In [None]:
deltaSales.history().select("version","operation","operationParameters").show(truncate=False)

### Adding new tuples
Write a merge statement to include the march 2023 records into `deltaSales`

In [None]:
merge_condition = "target.saleid = source.saleid"

# merge statement
deltaSales.alias("target").merge(march23_sales.alias("source"), merge_condition ).whenNotMatchedInsertAll().execute()


In [None]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

In [None]:
deltaSales.toDF().count()

### Updating tuples
Write an update statement that increases the prices of products sold on 2023, based on their category, as follows: furniture -> 05%, others -> 10%

In [None]:
deltaSales.toDF().where("saledate >= '2023-01-01' and category='Furniture'").count()

In [None]:
deltaSales.toDF().show()

In [None]:
deltaSales.update(condition = expr("saledate >= '2023-01-01' and category='Furniture'"), set = { "unitprice": expr("unitprice + 5*unitprice/100") })

In [None]:
deltaSales.history().select("version","operation","operationParameters").show()

In [None]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

In [None]:
deltaSales.toDF().where("saledate >= '2023-01-01' and category!='Furniture'").count()

In [None]:
deltaSales.update(condition = expr("saledate >= '2023-01-01' and category!='Furniture'"),set = { "unitprice": expr("unitprice + 10*unitprice/100") })

In [None]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

### Removing old records
remove all sales older than 01-Jan-2023. How many records remain?

In [None]:
deltaSales.toDF().where("saledate < '2023-01-01'").count()

In [None]:
deltaSales.delete("saledate < '2023-01-01'")


In [None]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

In [None]:
deltaSales.toDF().count()

### History viewing
Show the records that have been deleted. Use the metadata information and use dataframe operators.  

In [None]:
deltaSales.toDF().where("saledate < '2023-01-01'").count()

In [None]:
spark.read.format("delta").option("versionAsOf", 3).load("/tmp/delta/deltaSales").where("saledate < '2023-01-01'").count()


### Vacuuming old records
Permanently remove the deleted records using `vacuum`. Check the history again and make sure that the removal has been performed.

In [None]:
deltaSales.vacuum()

In [None]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

In [None]:
! ls -rhl /tmp/delta/deltaSales

In [None]:
deltaSales.toDF().count()

count the rows at version 0, 3 and 5 and analyse

In [None]:
history = deltaSales.history()
history.printSchema()
history.show()
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/deltaSales")
print(df.count())

In [None]:
df = spark.read.format("delta").option("versionAsOf", 3).load("/tmp/delta/deltaSales")
df.count()

In [None]:
df = spark.read.format("delta").option("versionAsOf", 5).load("/tmp/delta/deltaSales")
df.count()

## Additional material

### Generated columns
Consider the sales data from the use case, create a delta table called `deltaSalesDate` with three additional columns `year`, `month` and `day` derived from the `saledate` column of the original data.

In [None]:
orginal_sales = spark.read.csv("/tmp/delta/sales/salesOriginal.csv", header=True, inferSchema=True)
orginal_sales.printSchema()

#### option1: augment `orginal_sales` with the three columns

In [None]:
from pyspark.sql.functions import *

In [None]:
sales_with_date_components = orginal_sales.withColumn('year',year(col("saledate")))\
.withColumn('month',month(col("saledate")))\
.withColumn('day',dayofmonth(col("saledate")))
sales_with_date_components.show(10)

In [None]:
sales_with_date_components.write.format("delta").partitionBy("month").save("/tmp/delta/deltaSalesDate")

In [None]:
!ls -R /tmp/delta/deltaSalesDate

In [None]:
sales_with_date_components.write.format("delta").partitionBy("year","month").save("/tmp/delta/deltaSalesDateBis")

In [None]:
!ls -R /tmp/delta/deltaSalesDateBis

#### option2: create a delta table with a predefined schema
https://docs.delta.io/latest/delta-batch.html#use-generated-columns

In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.sales") \
  .addColumn("saleid", "STRING") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("quantity", "INT") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAYOFMONTH(saledate)") \
  .partitionedBy("year", "month") \
  .execute()

In [None]:
! ls -R spark-warehouse/sales/


In [None]:
spark.sql(""" DESCRIBE DETAIL default.sales """).show()

In [None]:
spark.sql(""" select * from default.sales """).show()

In [None]:
spark.sql(""" insert into default.sales
            values ('S000000124','2023-02-26 00:00:00',2.0,2023,02,26)  """).show()

In [None]:
spark.sql(""" select * from default.sales """).show()

if so run a query that aggregates on some measure like sum of `unitprice` based on `month` and observe the plan

In [None]:
spark.sql(""" DESCRIBE DETAIL delta.`/tmp/delta/deltaSalesDate` """).show()

In [None]:
spark.sql(""" select * from delta.`/tmp/delta/deltaSalesDate` """).show(2)

In [None]:
spark.sql(""" select month, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by month """).show()

In [None]:
spark.conf.set("spark.sql.adaptive.enabled",False)

In [None]:
spark.sql(""" select month, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by month """).explain()

Run a different query that aggregates on some measure like sum of `unitprice` based on  `saledate` and compare the plan with the previous one

In [None]:
spark.sql(""" select year, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by year """).show()

In [None]:
spark.sql(""" select year, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by year """).explain()

### Retore a delta table to a previous state


In [None]:
spark.sql("""DESCRIBE HISTORY  default.sales """).show()


In [None]:
spark.sql("""SELECT * FROM default.sales VERSION AS OF 1;""").show()


## Use case2


### Table creation

#### sales table
create a delta table `default.sales` with the following schema (saleid : String, saledate : Timestamp, productid: String, quantity : int, shopid : string)

In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.sales") \
  .addColumn("saleid", "STRING") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("productid", "String") \
  .addColumn("quantity", "INT") \
  .addColumn("shopid", "string") \
  .execute()

In [None]:
spark.sql(""" DESCRIBE default.sales """).show()

load the `/tmp/sales/march23_sales.csv` data into `default.sales` by selecting only the required columns

In [None]:
spark.sql(""" select * from default.sales """).show()
march23_sales.show()
df.show()

In [None]:
columns_to_select = ["saleid", "saledate", "productid", "quantity", "shopid"]
march23_sales = march23_sales.select(columns_to_select)
march23_sales.write.insertInto("default.sales")

see the result

In [None]:
spark.sql(""" select count (*) from default.sales """).show()

#### dates table
create a delta table `default.dates` with the following schema (saledate: timestamp, year: int, month: int) by ensuring that year and month are extracted from saledate

In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.dates") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .execute()

In [None]:
spark.sql(""" DESCRIBE default.dates""").show()

populate `default.dates` by inserting dates from `default.sales`

In [None]:
df = spark.sql(""" SELECT saledate FROM default.sales """)
df = df.withColumn('year',year(col("saledate"))).withColumn('month',month(col("saledate")))
df.write.insertInto("default.dates")

see the result

In [None]:
spark.sql(""" select * from default.dates limit 10 """).show()

#### products table
create a delta table default.products with the following schema (productid: string, unitprice: double, category: string, subcategory: string, size: string, color: string,
 brand: string) by extracting data from `originalSales.csv`

In [None]:

# Define the schema for the Delta table
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("productid", StringType(), True),
    StructField("unitprice", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("subcategory", StringType(), True),
    StructField("size", StringType(), True),
    StructField("color", StringType(), True),
    StructField("brand", StringType(), True)
])
csv_file_path = "/tmp/delta/sales/"
df = spark.read.option("header", "true").schema(schema).csv(csv_file_path)

# Write the DataFrame to a Delta Lake table
df.write.format("delta").mode("overwrite").saveAsTable("default.products")


In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.product") \
  .addColumn("productid", "STRING") \
  .addColumn("unitprice", "DOUBLE") \
  .addColumn("category", "STRING") \
  .addColumn("subcategory", "STRING") \
  .addColumn("size", "STRING") \
  .addColumn("color", "STRING") \
  .addColumn("brand", "STRING") \
  .execute()

see the result

In [None]:
spark.sql(""" select * from default.products limit 10 """).show()