Documentation

* Delta https://docs.delta.io/0.4.0/delta-intro.html
* API Reference https://docs.delta.io/0.4.0/api/python/index.html










## Prerequisite

In [None]:
!pip list|grep pyspark

In [None]:
!pip install --upgrade -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
!pip list|grep delta-spark

In [None]:
!pip install --upgrade -q delta-spark

In [None]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

local = "local[*]"
appName = "DeltaLake"
localConfig = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").\
  set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").\
  set("spark.jars.packages","io.delta:delta-core_2.12:2.4.0").\
  set("spark.databricks.delta.schema.autoMerge.enabled","true")


spark = SparkSession.builder.config(conf = localConfig).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

### imports

In [None]:
from delta.tables import *
from pyspark.sql.functions import *


In [None]:
import pandas as pd

### data loading

In [None]:
! wget https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar -O /tmp/sales.tar

--2023-07-14 22:43:57--  https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar
Resolving nuage.lip6.fr (nuage.lip6.fr)... 132.227.201.11
Connecting to nuage.lip6.fr (nuage.lip6.fr)|132.227.201.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 548352 (536K) [application/x-tar]
Saving to: ‘/tmp/sales.tar’


2023-07-14 22:43:58 (1.16 MB/s) - ‘/tmp/sales.tar’ saved [548352/548352]



In [None]:
! tar xvf /tmp/sales.tar -C /tmp

sales/
sales/._salesOriginal.csv
sales/salesOriginal.csv
sales/march23_sales.csv


In [None]:
!ls /tmp/sales

march23_sales.csv  salesOriginal.csv


## Questions

### Table creation


#### sales table
create a delta table `default.sales` with the following schema (saleid : String, saledate : Timestamp, productid: String, quantity : int, shopid : string)

In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.sales") \
  .addColumn("saleid", "STRING") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("productid", "STRING") \
  .addColumn("quantity", "INT") \
  .addColumn("shopid", "STRING") \
  .execute()

<delta.tables.DeltaTable at 0x7ca2d8bf2110>

In [None]:
spark.sql(""" DESCRIBE default.sales """).show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|   saleid|   string|   null|
| saledate|timestamp|   null|
|productid|   string|   null|
| quantity|      int|   null|
|   shopid|   string|   null|
+---------+---------+-------+



load the `/tmp/sales/march23_sales.csv` data into `default.sales` by selecting only the required columns

In [None]:
df = spark.read.csv('/tmp/sales/salesOriginal.csv', header=True, inferSchema=True)
df.show(5)

+----------+----------+--------+---------+------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+-----+
|    saleid|  saledate|quantity|unitprice|shopid|         city|     state|country|shopsize|productid| category|subcategory|  size|purchaseprice|color|brand|
+----------+----------+--------+---------+------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+-----+
|S000000124|2023-02-26|     2.0|     60.0|shop_4|San Francisco|California|    USA|   small|    CHA_2|Furniture|      Chair|  null|         48.0| blue|Basic|
|S000000125|2023-02-25|     1.0|    150.0|shop_5|      Houston|     Texas|    USA|   small|    BED_3|Furniture|        Bed|Single|        127.0|  red| Mega|
|S000000126|2023-02-24|     1.0|    300.0|shop_6|  San Antonio|     Texas|    USA|   small|    BED_4|Furniture|        Bed|Double|        252.0|brown|Basic|
|S000000127|2023-02-23|     1.0|    395.0|shop_7|      Chi

In [None]:
df.printSchema()

root
 |-- saleid: string (nullable = true)
 |-- saledate: date (nullable = true)
 |-- quantity: double (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- shopid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- shopsize: string (nullable = true)
 |-- productid: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- size: string (nullable = true)
 |-- purchaseprice: double (nullable = true)
 |-- color: string (nullable = true)
 |-- brand: string (nullable = true)



In [None]:
sales_with_correct_schema = df.withColumn('saledate',col("saledate").cast("TIMESTAMP"))\
.withColumn('quantity', col("quantity").cast("int") )\
.select("saleid", "saledate","productid","quantity","shopid")
sales_with_correct_schema.show(10)

+----------+-------------------+---------+--------+-------+
|    saleid|           saledate|productid|quantity| shopid|
+----------+-------------------+---------+--------+-------+
|S000000124|2023-02-26 00:00:00|    CHA_2|       2| shop_4|
|S000000125|2023-02-25 00:00:00|    BED_3|       1| shop_5|
|S000000126|2023-02-24 00:00:00|    BED_4|       1| shop_6|
|S000000127|2023-02-23 00:00:00|    BED_5|       1| shop_7|
|S000000128|2023-02-22 00:00:00|    BED_6|       4| shop_8|
|S000000129|2023-02-21 00:00:00|    TSH_7|       1| shop_9|
|S000000130|2023-02-20 00:00:00|    TSH_8|       1|shop_10|
|S000000131|2023-02-19 00:00:00|    TSH_9|       1|shop_11|
|S000000132|2023-02-18 00:00:00|   TSH_10|       2|shop_12|
|S000000133|2023-02-17 00:00:00|   TSH_11|       1|shop_13|
+----------+-------------------+---------+--------+-------+
only showing top 10 rows



In [None]:
sales_with_correct_schema\
.write\
.mode("overwrite") \
.format("delta")\
.option("mergeSchema", "true").\
saveAsTable("default.sales")

In [None]:
spark.sql(""" select * from default.sales limit 10 """).show()

+----------+-------------------+---------+--------+-------+
|    saleid|           saledate|productid|quantity| shopid|
+----------+-------------------+---------+--------+-------+
|S000000124|2023-02-26 00:00:00|    CHA_2|       2| shop_4|
|S000000125|2023-02-25 00:00:00|    BED_3|       1| shop_5|
|S000000126|2023-02-24 00:00:00|    BED_4|       1| shop_6|
|S000000127|2023-02-23 00:00:00|    BED_5|       1| shop_7|
|S000000128|2023-02-22 00:00:00|    BED_6|       4| shop_8|
|S000000129|2023-02-21 00:00:00|    TSH_7|       1| shop_9|
|S000000130|2023-02-20 00:00:00|    TSH_8|       1|shop_10|
|S000000131|2023-02-19 00:00:00|    TSH_9|       1|shop_11|
|S000000132|2023-02-18 00:00:00|   TSH_10|       2|shop_12|
|S000000133|2023-02-17 00:00:00|   TSH_11|       1|shop_13|
+----------+-------------------+---------+--------+-------+



In [None]:
spark.sql(""" select count(*) from default.sales """).show()

+--------+
|count(1)|
+--------+
|    4916|
+--------+



#### dates table
create a delta table `default.dates` with the following schema (saledate: timestamp, year: int, month: int) by ensuring that year and month are extracted from saledate

In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.dates") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .execute()

<delta.tables.DeltaTable at 0x7ca314b4c6a0>

In [None]:
spark.sql(""" DESCRIBE default.dates """).show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|saledate|timestamp|   null|
|    year|      int|   null|
|   month|      int|   null|
+--------+---------+-------+



populate `default.dates` by inserting dates from `default.sales`

In [None]:
spark.sql(""" INSERT INTO default.dates SELECT distinct saledate, year(saledate), month(saledate) FROM default.sales """).show()

++
||
++
++



In [None]:
spark.sql(""" select * from default.dates limit 10 """).show()

+-------------------+----+-----+
|           saledate|year|month|
+-------------------+----+-----+
|2023-01-29 00:00:00|2023|    1|
|2022-12-12 00:00:00|2022|   12|
|2023-01-04 00:00:00|2023|    1|
|2023-02-26 00:00:00|2023|    2|
|2023-01-12 00:00:00|2023|    1|
|2023-02-21 00:00:00|2023|    2|
|2023-01-15 00:00:00|2023|    1|
|2023-01-18 00:00:00|2023|    1|
|2023-02-07 00:00:00|2023|    2|
|2022-11-12 00:00:00|2022|   11|
+-------------------+----+-----+



In [None]:
spark.sql(""" select count(*) from default.dates """).show()

+--------+
|count(1)|
+--------+
|     118|
+--------+



#### products table
create a delta table default.products with the following schema (productid: string, unitprice: double, category: string, subcategory: string, size: string, color: string,
 brand: string) by extracting data from `originalSales.csv`

In [None]:
products = df.select('productid','unitprice','category','subcategory', 'size', 'color', 'brand')
products.show(5)

+---------+---------+---------+-----------+------+-----+-----+
|productid|unitprice| category|subcategory|  size|color|brand|
+---------+---------+---------+-----------+------+-----+-----+
|    CHA_2|     60.0|Furniture|      Chair|  null| blue|Basic|
|    BED_3|    150.0|Furniture|        Bed|Single|  red| Mega|
|    BED_4|    300.0|Furniture|        Bed|Double|brown|Basic|
|    BED_5|    395.0|Furniture|        Bed| Queen|black| Mega|
|    BED_6|    440.0|Furniture|        Bed|  King|white|Basic|
+---------+---------+---------+-----------+------+-----+-----+
only showing top 5 rows



In [None]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.products") \
  .addColumn("productid", "STRING") \
  .addColumn("unitprice", "DOUBLE") \
  .addColumn("category", "STRING") \
  .addColumn("subcategory", "STRING") \
  .addColumn("size", "STRING") \
  .addColumn("color", "STRING") \
  .addColumn("brand", "STRING") \
  .execute()

<delta.tables.DeltaTable at 0x7ca2ffdc6050>

In [None]:
spark.sql(""" DESCRIBE default.products """).show()

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|  productid|   string|   null|
|  unitprice|   double|   null|
|   category|   string|   null|
|subcategory|   string|   null|
|       size|   string|   null|
|      color|   string|   null|
|      brand|   string|   null|
+-----------+---------+-------+



In [None]:
products\
.write\
.mode("overwrite") \
.format("delta")\
.saveAsTable("default.products")

In [None]:
spark.sql(""" select * from default.products limit 10 """).show()

+---------+---------+---------+-----------+------+-----+--------+
|productid|unitprice| category|subcategory|  size|color|   brand|
+---------+---------+---------+-----------+------+-----+--------+
|    CHA_2|     60.0|Furniture|      Chair|  null| blue|   Basic|
|    BED_3|    150.0|Furniture|        Bed|Single|  red|    Mega|
|    BED_4|    300.0|Furniture|        Bed|Double|brown|   Basic|
|    BED_5|    395.0|Furniture|        Bed| Queen|black|    Mega|
|    BED_6|    440.0|Furniture|        Bed|  King|white|   Basic|
|    TSH_7|     20.0|    Cloth|     Tshirt|    XS| blue|NewBrand|
|    TSH_8|     20.0|    Cloth|     Tshirt|     S|  red|   Basic|
|    TSH_9|     22.0|    Cloth|     Tshirt|     M|brown|    Mega|
|   TSH_10|     24.0|    Cloth|     Tshirt|     L|black|    Over|
|   TSH_11|     24.0|    Cloth|     Tshirt|    XL|white|NewBrand|
+---------+---------+---------+-----------+------+-----+--------+



### Delta operation


#### Q1
Using the dataframe `newsales`, either add a new sales in case it does not exist otherwise add up the quantities of the existing sold item.

In [None]:
newsales = spark.read.csv('/tmp/sales/march23_sales.csv', header=True, inferSchema=True)\
          .withColumn('saledate',col("saledate").cast("TIMESTAMP"))\
          .withColumn('quantity', col("quantity").cast("int") )\
          .select('saleid','saledate','productid','quantity','shopid')
newsales.count()


84

In [None]:
newsales.show(6)

+----------+-------------------+---------+--------+------+
|    saleid|           saledate|productid|quantity|shopid|
+----------+-------------------+---------+--------+------+
|S000000000|2023-03-02 00:00:00|    TAB_0|       1|shop_0|
|S000000001|2023-03-01 00:00:00|    TAB_1|       1|shop_1|
|S000000120|2023-03-02 00:00:00|   SHO_59|       1|shop_0|
|S000000121|2023-03-01 00:00:00|   SHO_60|       3|shop_1|
|S000000240|2023-03-02 00:00:00|   SHO_57|       2|shop_0|
|S000000241|2023-03-01 00:00:00|   SHO_58|       2|shop_1|
+----------+-------------------+---------+--------+------+
only showing top 6 rows



In [None]:
default_sales_delta = DeltaTable.forName(spark,'default.sales')

In [None]:
default_sales_delta.alias("oldSales") \
  .merge(
    newsales.alias("newSales"),
    "oldSales.saleid = newSales.saleid") \
  .whenMatchedUpdate(set = { "quantity": col("oldSales.quantity") + col("newSales.quantity")  }) \
  .whenNotMatchedInsertAll() \
  .execute()


Verification

In [None]:
spark.sql(""" select count(*) from default.sales """).show()

+--------+
|count(1)|
+--------+
|    5000|
+--------+



In [None]:
history_sales = spark.sql("""DESCRIBE HISTORY default.sales""")

In [None]:
history_sales.select("version","operation","operationMetrics").show(truncate=False)

+-------+---------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation                        |operationMetrics                                                                                                                                                                                                                                                                                                                                                                                        

How many tuples have been modified and how many have been inserted?

Answer: 84 inserted


#### Q2
apply a 10% increase of the `unitprice` of `defalut.products` of those products whose total sold quantity exceeds 130, as per the `salesFact` table.

In [None]:
spark.sql(""" select sum(quantity) as quantity, productid from default.sales group by productid""").show(5)

+--------+---------+
|quantity|productid|
+--------+---------+
|     135|   HOO_54|
|     130|   HOO_13|
|     130|   BED_48|
|     132|   TAB_43|
|     135|   TSH_49|
+--------+---------+
only showing top 5 rows



In [None]:
# Define
products_delta = DeltaTable.forName(spark, "default.products")
salesFact = spark.sql(""" select sum(quantity) as quantity, productid from default.sales group by productid""")
merge_condition = "sales.productid = products.productid and sales.quantity > 130"
update_expression = { "unitprice": expr("unitprice * 1.1") }


# merge statement
products_delta.alias("products") \
  .merge(
    salesFact.alias("sales"), merge_condition ) \
  .whenMatchedUpdate(set = update_expression) \
  .execute()


In [None]:
history_products = spark.sql("""DESCRIBE HISTORY default.products""")
history_products.select("version","operation","operationMetrics").show(truncate=False)

+-------+---------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation                        |operationMetrics                                                                                                                                                                                                                                                                                                                                                                            

How many tuples have been modified?

Answer: 4111

#### Q3
Delete from `default.sales` sales older than 2 months as per March 1st 2023

In [None]:
default_sales_delta.delete(condition = expr("saledate < '2023-03-01 00:00:00' "))


In [None]:
history_sales = spark.sql("""DESCRIBE HISTORY default.sales""")
history_sales.select("version","operation","operationMetrics").show(truncate=False)

+-------+---------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation                        |operationMetrics                                                                                                                                                                                                                                                                                                                                                                                        

How many tuples have been deleted?

Answer: 4916