* Master DAC - BDLE
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr
* October 2023

## Outline

Documentation

* Delta https://docs.delta.io/0.4.0/delta-intro.html
* API Reference https://docs.delta.io/0.4.0/api/python/index.html

Organization
* Demo1 ->3 and additional material: illustrate concepts of the lecture
* Use cases: the exercice(s) to solve








## Prerequisite

In [1]:
!pip install --upgrade -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
!pip install --upgrade -q delta-spark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

local = "local[*]"
appName = "DeltaLake"
localConfig = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").\
  set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").\
  set("spark.jars.packages","io.delta:delta-core_2.12:2.4.0").\
  set("spark.databricks.delta.schema.autoMerge.enabled","true")


spark = SparkSession.builder.config(conf = localConfig).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

### imports

In [4]:
from delta.tables import *
from pyspark.sql.functions import *


## Demo1

### load the data into delta

In [5]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [6]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



### update the data
#### overwrite

In [7]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [8]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



#### conditional overwrite

In [93]:
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })


In [10]:
deltaTable.toDF().show()

+---+
| id|
+---+
|  7|
|108|
|  9|
|  5|
|106|
+---+



In [11]:
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

deltaTable.toDF().show()

+---+
| id|
+---+
|  7|
|  9|
|  5|
+---+



In [12]:
# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



### view history

In [13]:
history = deltaTable.history()
history.printSchema()
history.show()

root
 |-- version: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- userId: string (nullable = true)
 |-- userName: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- operationParameters: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- job: struct (nullable = true)
 |    |-- jobId: string (nullable = true)
 |    |-- jobName: string (nullable = true)
 |    |-- runId: string (nullable = true)
 |    |-- jobOwnerId: string (nullable = true)
 |    |-- triggerType: string (nullable = true)
 |-- notebook: struct (nullable = true)
 |    |-- notebookId: string (nullable = true)
 |-- clusterId: string (nullable = true)
 |-- readVersion: long (nullable = true)
 |-- isolationLevel: string (nullable = true)
 |-- isBlindAppend: boolean (nullable = true)
 |-- operationMetrics: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- userMetadata: string (nullable =

In [16]:
history.select("version","operation","operationMetrics").show(truncate=False)

+-------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                                                                                                                                                                                                                                                                                       

In [15]:
!ls -al /tmp/delta-table/_delta_log

total 48
drwxr-xr-x 2 root root 4096 Oct 13 07:00 .
drwxr-xr-x 3 root root 4096 Oct 13 07:00 ..
-rw-r--r-- 1 root root 1259 Oct 13 06:59 00000000000000000000.json
-rw-r--r-- 1 root root   20 Oct 13 06:59 .00000000000000000000.json.crc
-rw-r--r-- 1 root root 1324 Oct 13 06:59 00000000000000000001.json
-rw-r--r-- 1 root root   20 Oct 13 06:59 .00000000000000000001.json.crc
-rw-r--r-- 1 root root 1496 Oct 13 07:00 00000000000000000002.json
-rw-r--r-- 1 root root   20 Oct 13 07:00 .00000000000000000002.json.crc
-rw-r--r-- 1 root root 1492 Oct 13 07:00 00000000000000000003.json
-rw-r--r-- 1 root root   20 Oct 13 07:00 .00000000000000000003.json.crc
-rw-r--r-- 1 root root 1662 Oct 13 07:00 00000000000000000004.json
-rw-r--r-- 1 root root   24 Oct 13 07:00 .00000000000000000004.json.crc


In [18]:
!cat /tmp/delta-table/_delta_log/00000000000000000000.json

{"commitInfo":{"timestamp":1697180368473,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"5","numOutputBytes":"977"},"engineInfo":"Apache-Spark/3.4.1 Delta-Lake/2.4.0","txnId":"f40f1b0f-1632-4293-bb77-612d2f1f37f3"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"f5f84f73-35ea-4962-b804-65aa5acf4a19","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1697180360562}}
{"add":{"path":"part-00000-b261b3ed-0e88-4340-babe-1d4647858f97-c000.snappy.parquet","partitionValues":{},"size":486,"modificationTime":1697180368012,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":

In [19]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



In [20]:
df = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



## Datagen


### Persons

In [23]:
# create sample data
data = [("12345", "Alice", 25, "123 Main St"),
        ("67890", "Bob", 30, "456 Oak Ave"),
        ("24680", "Charlie", 35, "789 Elm St")]

# create a DataFrame from the sample data
df = spark.createDataFrame(data, ["serial", "name", "age", "address"])

# write the DataFrame to Delta format
df.write.format("delta").save("/tmp/persons")

In [24]:
newdata = [("78120", "Dan", 42, "432 Holly Rd"),
        ("97362", "Lorry", 40, "290 Wise Ave")]


# create a DataFrame
newPersons = spark.createDataFrame(newdata, ["serial", "name", "age", "address"])

In [25]:
df.show()

+------+-------+---+-----------+
|serial|   name|age|    address|
+------+-------+---+-----------+
| 12345|  Alice| 25|123 Main St|
| 67890|    Bob| 30|456 Oak Ave|
| 24680|Charlie| 35| 789 Elm St|
+------+-------+---+-----------+



In [26]:
newPersons.show()

+------+-----+---+------------+
|serial| name|age|     address|
+------+-----+---+------------+
| 78120|  Dan| 42|432 Holly Rd|
| 97362|Lorry| 40|290 Wise Ave|
+------+-----+---+------------+



### Salaries

In [27]:
salaries = [("12345", 45000),
        ("67890", 52000),
        ("24680", 36000),
        ("78120", 60000),
        ("97362",38000)]

# create a DataFrame from the sample data
df = spark.createDataFrame(salaries, ["serial", "salary"])

# write the DataFrame to Delta format
df.write.format("delta").save("/tmp/salaries")

In [28]:
new_salaries = [("12345", 47000),
        ("67890", 50000),
        ("24680", 46000),
        ("78120", 61000),
        ("97362",39000)]

# create a DataFrame
newSalaries = spark.createDataFrame(new_salaries, ["serial", "salary"])

### Sales

In [29]:
sales = [("CHA_2",2,60),("BED_4",1,300),("SHO_15",2,60)]

# create a DataFrame from the sample data
df = spark.createDataFrame(sales, ["product_id", "quantity", "totalprice"])
df.write.format("delta").save("/tmp/sales")

In [30]:
new_sales = [("SHO_15",3,90),("CHA_2",1,30),("BED_6",1,200)]

newSales = spark.createDataFrame(new_sales, ["product_id", "quantity", "totalprice"])

### Products

In [31]:
products_list = [("CHA_2","Furniture","blue"),("BED_4","Furniture","brown"),("SHO_15","Cloth","black")]
products = spark.createDataFrame(products_list, ["product_id", "category", "color"])

## Demo2

### Q1. Adding new tuples
Consider the Delta table `person` with the following columns: serial, name, age, and address. You have a new dataset `newPersons` with the same columns, but with additional records. Write a merge statement to update the Delta table with the new records.


In [32]:

#load the persons table
delta_persons = DeltaTable.forPath(spark, "/tmp/persons")

# Define the merge condition
merge_condition = "target.serial = source.serial"

# Define the merge statement
delta_persons.alias("target").merge(
    newPersons.alias("source"), merge_condition
).whenNotMatchedInsertAll().execute()



#### verification

In [33]:
delta_persons.toDF().show()

+------+-------+---+------------+
|serial|   name|age|     address|
+------+-------+---+------------+
| 97362|  Lorry| 40|290 Wise Ave|
| 12345|  Alice| 25| 123 Main St|
| 78120|    Dan| 42|432 Holly Rd|
| 67890|    Bob| 30| 456 Oak Ave|
| 24680|Charlie| 35|  789 Elm St|
+------+-------+---+------------+



In [34]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationParameters                                                                                                                                           |
+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |MERGE    |{predicate -> ["(serial#8521 = serial#7336)"], matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}], notMatchedBySourcePredicates -> []}|
|0      |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}                                                                                                                    |
+-------+---------+--------------------------------------------------------------------------------------

### Q2: updating existing tuples
Assume you have a Delta table `salaries` with columns serial and salary. You want to update the salary of the employees who earn less than 50,000. You have a new dataset, `newSalaries` with the same columns but with updated salary information. Write a merge statement to update the `salaries` table with the new salary information.


In [92]:
#load the persons table
delta_salaries = DeltaTable.forPath(spark, "/tmp/salaries")

# Define
merge_condition = "target.serial = source.serial and target.salary<50000"
update_expression = { "salary": "source.salary" }

# merge statement
delta_salaries.alias("target") \
  .merge(
    newSalaries.alias("source"), merge_condition ) \
  .whenMatchedUpdate(set = update_expression) \
  .execute()


#### verification

In [48]:
delta_salaries.toDF().show()


+------+------+
|serial|salary|
+------+------+
| 78120| 60000|
| 97362| 39000|
| 24680| 46000|
| 12345| 47000|
| 67890| 52000|
+------+------+



In [37]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationParameters                                                                                                                                           |
+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |MERGE    |{predicate -> ["(serial#8521 = serial#7336)"], matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}], notMatchedBySourcePredicates -> []}|
|0      |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}                                                                                                                    |
+-------+---------+--------------------------------------------------------------------------------------

### Q3: adding new tuples and updating existing ones
You have a Delta table `sales` with columns `product_id`, `quantity`, and `totalprice`. Write a merge statement to insert the new products from a dataframe `newSales` into `sales` and to make sure that, for existing products `sales` has the sum of the quantity and totalprice.


In [49]:
#load the sales table
delta_sales = DeltaTable.forPath(spark, "/tmp/sales")

In [50]:
delta_sales.toDF().show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       1|       200|     null| null|
|     CHA_2|       3|        90|Furniture| blue|
|    SHO_15|       5|       150|    Cloth|black|
+----------+--------+----------+---------+-----+



In [51]:
newSales.show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|    SHO_15|       3|        90|
|     CHA_2|       1|        30|
|     BED_6|       1|       200|
+----------+--------+----------+



In [53]:
# Define
merge_condition = "target.product_id = source.product_id"
update_expression = { "quantity": "target.quantity+source.quantity",  "totalprice": "target.totalprice+source.totalprice"}

# merge statement
delta_sales.alias("target") \
  .merge(
    newSales.alias("source"), merge_condition ) \
  .whenMatchedUpdate(set = update_expression) \
  .whenNotMatchedInsertAll()\
  .execute()

#### verification

In [54]:
delta_sales.toDF().show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       2|       400|     null| null|
|     CHA_2|       4|       120|Furniture| blue|
|    SHO_15|       8|       240|    Cloth|black|
+----------+--------+----------+---------+-----+



In [55]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationParameters                                                                                                                                           |
+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |MERGE    |{predicate -> ["(serial#8521 = serial#7336)"], matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}], notMatchedBySourcePredicates -> []}|
|0      |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}                                                                                                                    |
+-------+---------+--------------------------------------------------------------------------------------

### Q4: Merge tables with different schemas
You have a Delta table `sales` with a column `product_id`, among other.  Write a merge statement to update `sales` with  information about products using a dataset `productInfo` which contains the columns `product_id`, `category` and `color`, when available.

In [58]:
products.createOrReplaceTempView("products")

spark.sql("""MERGE INTO delta.`/tmp/sales` t
USING products s
ON t.product_id = s.product_id
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [59]:
spark.sql("""select * from  delta.`/tmp/sales` """).show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       2|       400|     null| null|
|     CHA_2|       4|       120|Furniture| blue|
|    SHO_15|       8|       240|    Cloth|black|
+----------+--------+----------+---------+-----+



#### verification

In [60]:
delta_persons.history().select("version","operation","operationParameters").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationParameters                                                                                                                                           |
+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |MERGE    |{predicate -> ["(serial#8521 = serial#7336)"], matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}], notMatchedBySourcePredicates -> []}|
|0      |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}                                                                                                                    |
+-------+---------+--------------------------------------------------------------------------------------

## Demo 3: Attaching constraints

### Not-null constraint

In [61]:
spark.sql("""
CREATE TABLE default.persons (
    serial INT NOT NULL,
    name STRING,
    birthDate TIMESTAMP,
    address STRING
  ) USING DELTA;

""")

DataFrame[]

In [62]:
spark.sql(""" DESCRIBE DETAIL default.persons """).show()

+------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
|format|                  id|                name|description|            location|           createdAt|        lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|       tableFeatures|
+------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
| delta|38cf5c54-59a4-44b...|spark_catalog.def...|       null|file:/content/spa...|2023-10-13 07:24:...|2023-10-13 07:24:...|              []|       0|          0|        {}|               1|               2|[appendOnly, inva...|
+------+--------------------+--------------------+-----------+------------------

In [63]:
!ls /content/spark-warehouse/persons

_delta_log


In [64]:
spark.sql("""select * from default.persons """).show()


+------+----+---------+-------+
|serial|name|birthDate|address|
+------+----+---------+-------+
+------+----+---------+-------+



In [65]:
spark.sql("""insert into default.persons values (12345, "Alice","2000-02-01" ,"123 Main St") """)

DataFrame[]

In [66]:
spark.sql("""select * from default.persons """).show()


+------+-----+-------------------+-----------+
|serial| name|          birthDate|    address|
+------+-----+-------------------+-----------+
| 12345|Alice|2000-02-01 00:00:00|123 Main St|
+------+-----+-------------------+-----------+



Can we run the following statement? No because serial not null

In [73]:
#spark.sql("""insert into default.persons values (null, "Bob","1996-03-14" ,"456 Oak Ave") """)

### Predicate constraint

In [68]:
spark.sql(""" ALTER TABLE default.persons ADD CONSTRAINT birthdate CHECK (birthDate > '2000-01-01'); """)

DataFrame[]

In [69]:
spark.sql("""SHOW TBLPROPERTIES default.persons""").show(truncate=False)

+---------------------------+------------------------+
|key                        |value                   |
+---------------------------+------------------------+
|delta.constraints.birthdate|birthDate > '2000-01-01'|
|delta.minReaderVersion     |1                       |
|delta.minWriterVersion     |3                       |
+---------------------------+------------------------+



Can we run the following statement?

*  yes
*  no because birthDate must be > '2000-01-01é'


In [70]:
spark.sql("""insert into default.persons values (47962, "Bob","2003-03-14" ,"456 Oak Ave") """)

DataFrame[]

In [71]:
# spark.sql("""insert into default.persons values (47962, "Bob","1999-03-14" ,"456 Oak Ave") """)

## Use case 1

### Data import

In [74]:
! wget https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar -O /tmp/sales.tar

--2023-10-13 07:29:51--  https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar
Resolving nuage.lip6.fr (nuage.lip6.fr)... 132.227.201.11
Connecting to nuage.lip6.fr (nuage.lip6.fr)|132.227.201.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 548352 (536K) [application/x-tar]
Saving to: ‘/tmp/sales.tar’


2023-10-13 07:29:53 (1.02 MB/s) - ‘/tmp/sales.tar’ saved [548352/548352]



In [75]:
!mkdir /tmp/delta

In [76]:
! tar xvf /tmp/sales.tar -C /tmp/delta

sales/
sales/._salesOriginal.csv
sales/salesOriginal.csv
sales/march23_sales.csv


In [77]:
!ls /tmp/delta/sales

march23_sales.csv  salesOriginal.csv


In [78]:
# original
orginal_sales = spark.read.csv("/tmp/delta/sales/salesOriginal.csv", header=True, inferSchema=True)
orginal_sales.write.format("delta").partitionBy("category").save("/tmp/delta/deltaSales")

In [79]:
print("count: %d \n schema: " % orginal_sales.count())
orginal_sales.dtypes

count: 4916 
 schema: 


[('saleid', 'string'),
 ('saledate', 'date'),
 ('quantity', 'double'),
 ('unitprice', 'double'),
 ('shopid', 'string'),
 ('city', 'string'),
 ('state', 'string'),
 ('country', 'string'),
 ('shopsize', 'string'),
 ('productid', 'string'),
 ('category', 'string'),
 ('subcategory', 'string'),
 ('size', 'string'),
 ('purchaseprice', 'double'),
 ('color', 'string'),
 ('brand', 'string')]

In [80]:
# march 2023 sales
march23_sales = spark.read.csv("/tmp/delta/sales/march23_sales.csv", header=True, inferSchema=True)
print("count: %d \n schema: " % march23_sales.count())
march23_sales.dtypes

count: 84 
 schema: 


[('saleid', 'string'),
 ('saledate', 'date'),
 ('quantity', 'double'),
 ('unitprice', 'double'),
 ('shopid', 'string'),
 ('city', 'string'),
 ('state', 'string'),
 ('country', 'string'),
 ('shopsize', 'string'),
 ('productid', 'string'),
 ('category', 'string'),
 ('subcategory', 'string'),
 ('size', 'string'),
 ('purchaseprice', 'double'),
 ('color', 'string'),
 ('brand', 'string')]

### Load the delta table

In [81]:
deltaSales = DeltaTable.forPath(spark, "/tmp/delta/deltaSales")

In [82]:
deltaSales.detail().show()

+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
|format|                  id|name|description|            location|           createdAt|        lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|       tableFeatures|
+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
| delta|a1853f95-7df3-4fe...|null|       null|file:/tmp/delta/d...|2023-10-13 07:30:...|2023-10-13 07:30:...|      [category]|       2|      50284|        {}|               1|               2|[appendOnly, inva...|
+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+---------

In [83]:
deltaSales.history().select("version","operation","operationParameters").show(truncate=False)

+-------+---------+----------------------------------------------------+
|version|operation|operationParameters                                 |
+-------+---------+----------------------------------------------------+
|0      |WRITE    |{mode -> ErrorIfExists, partitionBy -> ["category"]}|
+-------+---------+----------------------------------------------------+



### Adding new tuples
Write a merge statement to include the march 2023 records into `deltaSales`

In [None]:
# Define
merge_condition = "target.product_id = source.product_id"
update_expression = { "quantity": "target.quantity+source.quantity",  "totalprice": "target.totalprice+source.totalprice"}

# merge statement
delta_sales.alias("target") \
  .merge(
    newSales.alias("source"), merge_condition ) \
  .whenMatchedUpdate(set = update_expression) \
  .whenNotMatchedInsertAll()\
  .execute()

In [88]:
# merge

# Define the merge condition
merge_condition = "target.saleid = source.saleid"

# Define the merge statement
deltaSales.alias("target")\
  .merge(
      march23_sales.alias("source"), merge_condition)\
      .whenNotMatchedInsertAll()\
      .execute()


In [89]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                                                                                                                                                                                                                                                                                          

In [90]:
deltaSales.toDF().count()

5000

### Updating tuples
Write an update statement that increases the prices of products sold on 2023, based on their category, as follows: furniture -> 05%, others -> 10%

In [91]:
deltaSales.toDF().where("saledate >= '2023-01-01' and category='Furniture'").count()

882

In [96]:
# update

deltaSales.update(
    condition = expr("saledate >= '2023-01-01' and category='Furniture'"),
    set = { "unitprice" : expr("unitprice * 1.05")}
)


In [97]:
deltaSales.history().select("version","operation","operationParameters").show()

+-------+---------+--------------------+
|version|operation| operationParameters|
+-------+---------+--------------------+
|      2|   UPDATE|{predicate -> ["(...|
|      1|    MERGE|{predicate -> ["(...|
|      0|    WRITE|{mode -> ErrorIfE...|
+-------+---------+--------------------+



In [98]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                                                                                                                                                                                                                                                                                          

In [99]:
deltaSales.toDF().where("saledate >= '2023-01-01' and category!='Furniture'").count()

1680

In [100]:
deltaSales.update(
    condition = expr("saledate >= '2023-01-01' and category!='Furniture'"),
    set = { "unitprice" : expr("unitprice * 1.1")}
)

In [101]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                                                                                                                                                                                                                                                                                          

### Removing old records
remove all sales older than 01-Jan-2023. How many records remain?

In [102]:
deltaSales.toDF().where("saledate < '2023-01-01'").count()

2438

In [103]:
# delete

deltaSales.delete(
    condition = expr("saledate < '2023-01-01'")
)


In [104]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

+-------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                                                                                                                                                                                                                                                                                          

In [105]:
deltaSales.toDF().count()

2562

### History viewing
Show the records that have been deleted. Use the metadata information and use dataframe operators.  

In [106]:
deltaSales.toDF().where("saledate < '2023-01-01'").count()

0

In [110]:
spark.read.format("delta").option("versionAsOf", 3).load("/tmp/delta/deltaSales").where("saledate < '2023-01-01'").count()


2438

### Vacuuming old records
Permanently remove the deleted records using `vacuum`. Check the history again and make sure that the removal has been performed.

In [108]:
deltaSales.vacuum()

DataFrame[]

In [109]:
deltaSales.history().select("version","operation","operationMetrics").show(truncate=False)

+-------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation   |operationMetrics                                                                                                                                                                                                                                                                                                                                                                                                                                    

In [111]:
! ls -rhl /tmp/delta/deltaSales

total 12K
drwxr-xr-x 2 root root 4.0K Oct 13 07:57  _delta_log
drwxr-xr-x 2 root root 4.0K Oct 13 07:54 'category=Furniture'
drwxr-xr-x 2 root root 4.0K Oct 13 07:54 'category=Cloth'


In [112]:
deltaSales.toDF().count()

2562

count the rows at version 0, 3 and 5 and analyse

In [116]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/deltaSales")
df.count()

4916

In [117]:
df = spark.read.format("delta").option("versionAsOf", 3).load("/tmp/delta/deltaSales")
df.count()

5000

In [118]:
df = spark.read.format("delta").option("versionAsOf", 5).load("/tmp/delta/deltaSales")
df.count()

2562

Vacuum supprime les fichiers obsolètes.  Ici rien n'a été effacé. Quoi qu'il arrive on conserve l'historique des opérations.


## Additional material

### Generated columns
Consider the sales data from the use case, create a delta table called `deltaSalesDate` with three additional columns `year`, `month` and `day` derived from the `saledate` column of the original data.

In [119]:
orginal_sales = spark.read.csv("/tmp/delta/sales/salesOriginal.csv", header=True, inferSchema=True)
orginal_sales.printSchema()

root
 |-- saleid: string (nullable = true)
 |-- saledate: date (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- shopid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- shopsize: string (nullable = true)
 |-- productid: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- size: string (nullable = true)
 |-- purchaseprice: double (nullable = true)
 |-- color: string (nullable = true)
 |-- brand: string (nullable = true)



#### option1: augment `orginal_sales` with the three columns

In [120]:
from pyspark.sql.functions import *

In [121]:
sales_with_date_components = orginal_sales.withColumn('year',year(col("saledate")))\
.withColumn('month',month(col("saledate")))\
.withColumn('day',dayofmonth(col("saledate")))
sales_with_date_components.show(10)

+----------+----------+--------+---------+-------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+--------+----+-----+---+
|    saleid|  saledate|quantity|unitprice| shopid|         city|     state|country|shopsize|productid| category|subcategory|  size|purchaseprice|color|   brand|year|month|day|
+----------+----------+--------+---------+-------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+--------+----+-----+---+
|S000000124|2023-02-26|     2.0|     60.0| shop_4|San Francisco|California|    USA|   small|    CHA_2|Furniture|      Chair|  null|         48.0| blue|   Basic|2023|    2| 26|
|S000000125|2023-02-25|     1.0|    150.0| shop_5|      Houston|     Texas|    USA|   small|    BED_3|Furniture|        Bed|Single|        127.0|  red|    Mega|2023|    2| 25|
|S000000126|2023-02-24|     1.0|    300.0| shop_6|  San Antonio|     Texas|    USA|   small|    BED_4|Furniture|        

In [122]:
sales_with_date_components.write.format("delta").partitionBy("month").save("/tmp/delta/deltaSalesDate")

In [123]:
!ls -R /tmp/delta/deltaSalesDate

/tmp/delta/deltaSalesDate:
 _delta_log  'month=1'	'month=11'  'month=12'	'month=2'

/tmp/delta/deltaSalesDate/_delta_log:
00000000000000000000.json

'/tmp/delta/deltaSalesDate/month=1':
part-00000-d016ad82-b630-421a-bdce-dd24a58b0821.c000.snappy.parquet

'/tmp/delta/deltaSalesDate/month=11':
part-00000-4a52cfdd-9bcc-4b63-9ad1-5044cc675731.c000.snappy.parquet

'/tmp/delta/deltaSalesDate/month=12':
part-00000-563cbb02-ad0c-4d9a-b41a-25d30e7aed29.c000.snappy.parquet

'/tmp/delta/deltaSalesDate/month=2':
part-00000-c98985d6-3464-4735-9acc-2fc15c4b7416.c000.snappy.parquet


In [124]:
sales_with_date_components.write.format("delta").partitionBy("year","month").save("/tmp/delta/deltaSalesDateBis")

In [125]:
!ls -R /tmp/delta/deltaSalesDateBis

/tmp/delta/deltaSalesDateBis:
 _delta_log  'year=2022'  'year=2023'

/tmp/delta/deltaSalesDateBis/_delta_log:
00000000000000000000.json

'/tmp/delta/deltaSalesDateBis/year=2022':
'month=11'  'month=12'

'/tmp/delta/deltaSalesDateBis/year=2022/month=11':
part-00000-37518ba6-2ec1-4275-8285-64f8562b6b63.c000.snappy.parquet

'/tmp/delta/deltaSalesDateBis/year=2022/month=12':
part-00000-4dcecce8-8096-4822-affe-03dec60b32ce.c000.snappy.parquet

'/tmp/delta/deltaSalesDateBis/year=2023':
'month=1'  'month=2'

'/tmp/delta/deltaSalesDateBis/year=2023/month=1':
part-00000-aee6ed25-6b87-4671-a46d-aa8bfb74b0aa.c000.snappy.parquet

'/tmp/delta/deltaSalesDateBis/year=2023/month=2':
part-00000-2808d55e-e7c8-4b50-a10e-56acec0137b0.c000.snappy.parquet


#### option2: create a delta table with a predefined schema
https://docs.delta.io/latest/delta-batch.html#use-generated-columns

In [126]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.sales") \
  .addColumn("saleid", "STRING") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("quantity", "INT") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAYOFMONTH(saledate)") \
  .partitionedBy("year", "month") \
  .execute()

<delta.tables.DeltaTable at 0x7b687588f370>

In [127]:
! ls -R spark-warehouse/sales/


spark-warehouse/sales/:
_delta_log

spark-warehouse/sales/_delta_log:
00000000000000000000.json


In [128]:
spark.sql(""" DESCRIBE DETAIL default.sales """).show()

+------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
|format|                  id|                name|description|            location|           createdAt|        lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|       tableFeatures|
+------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
| delta|55cf8b4c-5fbd-4c3...|spark_catalog.def...|       null|file:/content/spa...|2023-10-13 08:08:...|2023-10-13 08:08:...|   [year, month]|       0|          0|        {}|               1|               4|[appendOnly, chan...|
+------+--------------------+--------------------+-----------+------------------

In [129]:
spark.sql(""" select * from default.sales """).show()

+------+--------+--------+----+-----+---+
|saleid|saledate|quantity|year|month|day|
+------+--------+--------+----+-----+---+
+------+--------+--------+----+-----+---+



In [130]:
spark.sql(""" insert into default.sales
            values ('S000000124','2023-02-26 00:00:00',2.0,2023,02,26)  """).show()

++
||
++
++



In [131]:
spark.sql(""" select * from default.sales """).show()

+----------+-------------------+--------+----+-----+---+
|    saleid|           saledate|quantity|year|month|day|
+----------+-------------------+--------+----+-----+---+
|S000000124|2023-02-26 00:00:00|       2|2023|    2| 26|
+----------+-------------------+--------+----+-----+---+



if so run a query that aggregates on some measure like sum of `unitprice` based on `month` and observe the plan

In [132]:
spark.sql(""" DESCRIBE DETAIL delta.`/tmp/delta/deltaSalesDate` """).show()

+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
|format|                  id|name|description|            location|           createdAt|        lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|       tableFeatures|
+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+--------------------+
| delta|2b03ed1e-38db-444...|null|       null|file:/tmp/delta/d...|2023-10-13 08:06:...|2023-10-13 08:06:...|         [month]|       4|      65127|        {}|               1|               2|[appendOnly, inva...|
+------+--------------------+----+-----------+--------------------+--------------------+--------------------+----------------+--------+---------

In [133]:
spark.sql(""" select * from delta.`/tmp/delta/deltaSalesDate` """).show(2)

+----------+----------+--------+---------+-------+-------+-------+-------+--------+---------+--------+-----------+----+-------------+-----+-----+----+-----+---+
|    saleid|  saledate|quantity|unitprice| shopid|   city|  state|country|shopsize|productid|category|subcategory|size|purchaseprice|color|brand|year|month|day|
+----------+----------+--------+---------+-------+-------+-------+-------+--------+---------+--------+-----------+----+-------------+-----+-----+----+-----+---+
|S000000212|2022-11-30|     3.0|     20.0|shop_12| Chieti|Abruzzo|  Italy|  medium|   TSH_29|   Cloth|     Tshirt|   S|         18.0|brown| Mega|2022|   11| 30|
|S000000213|2022-11-29|     1.0|     22.0|shop_13|Pescara|Abruzzo|  Italy|   small|   TSH_30|   Cloth|     Tshirt|   M|         19.0|black| Over|2022|   11| 29|
+----------+----------+--------+---------+-------+-------+-------+-------+--------+---------+--------+-----------+----+-------------+-----+-----+----+-----+---+
only showing top 2 rows



In [134]:
spark.sql(""" select month, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by month """).show()

+-----+--------------+
|month|sum(unitprice)|
+-----+--------------+
|   12|      154648.0|
|    1|      154013.0|
|   11|      136472.0|
|    2|      141531.0|
+-----+--------------+



In [135]:
spark.conf.set("spark.sql.adaptive.enabled",False)

In [136]:
spark.sql(""" select month, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by month """).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[month#36128], functions=[sum(unitprice#36114)])
+- Exchange hashpartitioning(month#36128, 200), ENSURE_REQUIREMENTS, [plan_id=14772]
   +- *(1) HashAggregate(keys=[month#36128], functions=[partial_sum(unitprice#36114)])
      +- *(1) ColumnarToRow
         +- FileScan parquet [unitprice#36114,month#36128] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[file:/tmp/delta/deltaSalesDate], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<unitprice:double>




Run a different query that aggregates on some measure like sum of `unitprice` based on  `saledate` and compare the plan with the previous one

In [137]:
spark.sql(""" select year, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by year """).show()

+----+--------------+
|year|sum(unitprice)|
+----+--------------+
|2023|      295544.0|
|2022|      291120.0|
+----+--------------+



In [138]:
spark.sql(""" select year, sum(unitprice) from delta.`/tmp/delta/deltaSalesDate` group by year """).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[year#36348], functions=[sum(unitprice#36335)])
+- Exchange hashpartitioning(year#36348, 200), ENSURE_REQUIREMENTS, [plan_id=14908]
   +- *(1) HashAggregate(keys=[year#36348], functions=[partial_sum(unitprice#36335)])
      +- *(1) Project [unitprice#36335, year#36348]
         +- *(1) ColumnarToRow
            +- FileScan parquet [unitprice#36335,year#36348,month#36349] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[file:/tmp/delta/deltaSalesDate], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<unitprice:double,year:int>




### Retore a delta table to a previous state


In [139]:
spark.sql("""DESCRIBE HISTORY  default.sales """).show()


+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      1|2023-10-13 08:10:...|  null|    null|               WRITE|{mode -> Append, ...|null|    null|     null|          0|  Serializable|         true|{numFiles -> 1, n...|        null|Apache-Spark/3.4....|
|      0|2023-10-13 08:08:...|  null|    null|CREATE OR REPLACE...|{isManaged -> tru...|null|    null|     null|       null|  Serializable|         true|           

In [140]:
spark.sql("""SELECT * FROM default.sales VERSION AS OF 1;""").show()


+----------+-------------------+--------+----+-----+---+
|    saleid|           saledate|quantity|year|month|day|
+----------+-------------------+--------+----+-----+---+
|S000000124|2023-02-26 00:00:00|       2|2023|    2| 26|
+----------+-------------------+--------+----+-----+---+



## Use case2


### Table creation

#### sales table
create a delta table `default.sales` with the following schema (saleid : String, saledate : Timestamp, productid: String, quantity : int, shopid : string)

In [208]:
spark.sql("""
CREATE TABLE default.sales1 (
    saleid STRING NOT NULL,
    saledate TIMESTAMP,
    productid STRING,
    quantity INT,
    shopid STRING
  ) USING DELTA;

""").show()

AnalysisException: ignored

load the `/tmp/sales/march23_sales.csv` data into `default.sales` by selecting only the required columns

In [202]:
spark.sql("""DESCRIBE default.sales1""").show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|   saleid|   string|   null|
| saledate|timestamp|   null|
|productid|   string|   null|
| quantity|      int|   null|
|   shopid|   string|   null|
+---------+---------+-------+



In [231]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType

delta_schema = StructType([
    StructField("saleid", StringType(), True),
    StructField("saledate", TimestampType(), True),
    StructField("productid", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("shopid", StringType(), True)
])

march23_sales = spark.read.csv("/tmp/delta/sales/march23_sales.csv", header=True, schema=delta_schema)
march23_sales.show(10)

+----------+-------------------+---------+--------+------+
|    saleid|           saledate|productid|quantity|shopid|
+----------+-------------------+---------+--------+------+
|S000000000|2023-03-02 00:00:00|      1.0|    null|shop_0|
|S000000001|2023-03-01 00:00:00|      1.0|    null|shop_1|
|S000000120|2023-03-02 00:00:00|      1.0|    null|shop_0|
|S000000121|2023-03-01 00:00:00|      3.0|    null|shop_1|
|S000000240|2023-03-02 00:00:00|      2.0|    null|shop_0|
|S000000241|2023-03-01 00:00:00|      2.0|    null|shop_1|
|S000000360|2023-03-02 00:00:00|      1.0|    null|shop_0|
|S000000361|2023-03-01 00:00:00|      1.0|    null|shop_1|
|S000000480|2023-03-02 00:00:00|      1.0|    null|shop_0|
|S000000481|2023-03-01 00:00:00|      1.0|    null|shop_1|
+----------+-------------------+---------+--------+------+
only showing top 10 rows



In [204]:
march23_sales.write.format("delta").mode("overwrite").saveAsTable("default.sales1")


see the result

In [207]:
spark.sql(""" select count(*) from default.sales1 """).show()

+--------+
|count(1)|
+--------+
|      84|
+--------+



#### dates table
create a delta table `default.dates` with the following schema (saledate: timestamp, year: int, month: int) by ensuring that year and month are extracted from saledate

In [213]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.dates") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .execute()

spark.sql("""DESCRIBE default.dates""").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|saledate|timestamp|   null|
|    year|      int|   null|
|   month|      int|   null|
+--------+---------+-------+



populate `default.dates` by inserting dates from `default.sales`

In [214]:
spark.sql("""
INSERT INTO default.dates
SELECT saledate, YEAR(saledate) AS year, MONTH(saledate) AS month
FROM default.sales1
""")


DataFrame[]

see the result

In [215]:
spark.sql(""" select * from default.dates limit 10 """).show()

+-------------------+----+-----+
|           saledate|year|month|
+-------------------+----+-----+
|2023-03-02 00:00:00|2023|    3|
|2023-03-01 00:00:00|2023|    3|
|2023-03-02 00:00:00|2023|    3|
|2023-03-01 00:00:00|2023|    3|
|2023-03-02 00:00:00|2023|    3|
|2023-03-01 00:00:00|2023|    3|
|2023-03-02 00:00:00|2023|    3|
|2023-03-01 00:00:00|2023|    3|
|2023-03-02 00:00:00|2023|    3|
|2023-03-01 00:00:00|2023|    3|
+-------------------+----+-----+



#### products table
create a delta table default.products with the following schema (productid: string, unitprice: double, category: string, subcategory: string, size: string, color: string,
 brand: string) by extracting data from `originalSales.csv`

In [218]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.products") \
  .addColumn("productid", "STRING") \
  .addColumn("unitprice", "DOUBLE") \
  .addColumn("category", "STRING") \
  .addColumn("size", "STRING")\
  .addColumn("color", "STRING")\
  .addColumn("brand", "STRING")\
  .execute()

spark.sql("""DESCRIBE default.products""").show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|productid|   string|   null|
|unitprice|   double|   null|
| category|   string|   null|
|     size|   string|   null|
|    color|   string|   null|
|    brand|   string|   null|
+---------+---------+-------+



In [235]:
original_sales = spark.read.csv("/tmp/delta/sales/salesOriginal.csv", header=True, inferSchema = True)

spark.sql("""MERGE INTO default.products t
  USING original_sales s
    ON t.productid = s.productid
    WHEN MATCHED
      THEN UPDATE SET *
    WHEN NOT MATCHED
      THEN INSERT *""")

AnalysisException: ignored

In [233]:
delta_schema = StructType([
    StructField("productid", StringType(), True),
    StructField("unitprice", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("size", StringType(), True),
    StructField("color", StringType(), True),
    StructField("brand", StringType(), True)
])
original_sales = spark.read.csv("/tmp/delta/sales/salesOriginal.csv", header=True)
original_sales.show(10)

+----------+----------+--------+---------+-------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+--------+
|    saleid|  saledate|quantity|unitprice| shopid|         city|     state|country|shopsize|productid| category|subcategory|  size|purchaseprice|color|   brand|
+----------+----------+--------+---------+-------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+--------+
|S000000124|2023-02-26|     2.0|     60.0| shop_4|San Francisco|California|    USA|   small|    CHA_2|Furniture|      Chair|  null|         48.0| blue|   Basic|
|S000000125|2023-02-25|     1.0|    150.0| shop_5|      Houston|     Texas|    USA|   small|    BED_3|Furniture|        Bed|Single|        127.0|  red|    Mega|
|S000000126|2023-02-24|     1.0|    300.0| shop_6|  San Antonio|     Texas|    USA|   small|    BED_4|Furniture|        Bed|Double|        252.0|brown|   Basic|
|S000000127|2023-02-23|     1.0|  

In [228]:
original_sales.write.format("delta").mode("overwrite").saveAsTable("default.product")


see the result

In [227]:
spark.sql(""" select * from default.products limit 10 """).show()

+---------+---------+--------+----+-----+-----+
|productid|unitprice|category|size|color|brand|
+---------+---------+--------+----+-----+-----+
+---------+---------+--------+----+-----+-----+

