In [0]:
# What is Delta Lake?
# Delta Lake is a storage layer that adds reliability, transactions, and versioning on top of Parquet files.
# Delta Lake does NOT replace Parquet.
# It uses Parquet internally.

# ACID transactions ensure that data operations are reliable, consistent, and safe even when failures or concurrent operations occur.

# | Feature            | Parquet | Delta Lake |
# | ------------------ | ------- | ---------- |
# | Schema check       | ❌ No    | ✅ Yes      |
# | Wrong data allowed | ✅       | ❌          |
# | Data safety        | ❌       | ✅          |

# Can wrong data enter a Delta table? -> ❌ No (product_id column will not take ABC as a value in Delta)
# Can wrong data enter a Parquet file? -> ✅ Yes (product_id column will take ABC as a value in Parquet)


**loaded CSV data**

In [0]:
df_oct = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)
df_nov = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
df_both = df_oct.union(df_nov)
df_both.printSchema()
print(f'There are {df_both.count()} records in the combined dataset')

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

There are 109950743 records in the combined dataset


**CSV to Delta format**

In [0]:

df_both.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/combined_data")



**Delta table Created**

In [0]:
df_both.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("combined_data_as_delta_table")


In [0]:
# Verified that the table created.
spark.sql("SHOW TABLES").show()


+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|       brand_revenue|      false|
| default|combined_data_as_...|      false|
+--------+--------------------+-----------+



In [0]:
spark.read.table("combined_data_as_delta_table").printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:
# combined_data_as_delta_table is a Delta table.
# Delta Lake enforces schema, so it does not allow inserting values with incorrect data types.
# event_time`column expects a timestamp, and I am inserting a string value, it will result in an error.


spark.sql("""
INSERT INTO combined_data_as_delta_table
VALUES ('ABC', 'Iphone', 1001, 1, 'aa', 'bb', 'DEF', 24,'djflkd')
""")


[0;31m---------------------------------------------------------------------------[0m
[0;31mDateTimeException[0m                         Traceback (most recent call last)
File [0;32m<command-4665876802857002>, line 1[0m
[0;32m----> 1[0m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m      2[0m [38;5;124mINSERT INTO combined_data_as_delta_table[39m
[1;32m      3[0m [38;5;124mVALUES ([39m[38;5;124m'[39m[38;5;124mABC[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mIphone[39m[38;5;124m'[39m[38;5;124m, 1001, 1, [39m[38;5;124m'[39m[38;5;124maa[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mbb[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mDEF[39m[38;5;124m'[39m[38;5;124m, 24,[39m[38;5;124m'[39m[38;5;124mdjflkd[39m[38;5;124m'[39m[38;5;124m)[39m
[1;32m      4[0m [38;5;124m"""[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/session.py:875[0m, in [0;3

**Handling duplicate inserts**

In [0]:
from pyspark.sql.functions import count

# Checking the duplicated records
spark.table("combined_data_as_delta_table") \
    .groupBy("product_id", "user_id", "event_time") \
    .agg(count("*").alias("cnt")) \
    .filter("cnt > 1") \
    .show()


+----------+---------+-------------------+---+
|product_id|  user_id|         event_time|cnt|
+----------+---------+-------------------+---+
|   1801690|567017558|2019-11-17 09:00:34|  2|
|  22700129|542344512|2019-11-17 10:07:59|  2|
|  12720549|567747678|2019-11-17 10:32:22|  2|
|  44600105|522071512|2019-11-17 11:06:31|  2|
|  19300096|515132319|2019-11-17 11:24:23|  2|
|   1004873|558296315|2019-11-17 07:26:04|  2|
|  26601230|571691165|2019-11-17 07:49:28|  2|
|   3600990|516805522|2019-11-17 08:24:52|  3|
|  26402310|549034602|2019-11-17 13:51:25|  3|
|   4804055|519231750|2019-11-17 13:58:34|  2|
|  27700722|553358531|2019-11-17 14:54:27|  4|
|  15201070|512756298|2019-11-17 11:35:12|  2|
|   1004856|515271727|2019-11-16 16:26:47|  2|
|   1801689|565379445|2019-11-16 16:33:51|  2|
|  12718062|564283211|2019-11-16 17:00:43|  2|
|   1801806|572636943|2019-11-17 15:42:54|  2|
|   1801908|546529427|2019-11-17 16:30:27|  2|
|   1004768|572659827|2019-11-17 17:03:50|  2|
| 100006625|5

In [0]:
# Removing the duplicated records
dup_Removed = spark.table("combined_data_as_delta_table") \
            .dropDuplicates(["product_id", "user_id", "event_time"])

In [0]:
# Saving back to the table after removing the duplicated records
dup_Removed.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("combined_data_as_delta_table")

In [0]:
# Verifying the table after removing the duplicated records

spark.table("combined_data_as_delta_table") \
    .groupBy("product_id", "user_id", "event_time") \
    .agg(count("*").alias("cnt")) \
    .filter("cnt > 1") \
    .show()

+----------+-------+----------+---+
|product_id|user_id|event_time|cnt|
+----------+-------+----------+---+
+----------+-------+----------+---+



In [0]:
# Number of rows in the table after removing the duplicated records

spark.table("combined_data_as_delta_table").count()

109759528