In [7]:
import pyspark
from delta import *
from pyspark.sql.types import StructType, StructField, DateType, IntegerType, FloatType
from pyspark.sql.functions import to_date
from datetime import date


In [2]:
builder=(pyspark.sql.SparkSession.builder.appName("TimeTravel")
         .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
         )

In [3]:
spark=configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/vk/Library/Python/3.9/lib/python/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/vk/.ivy2/cache
The jars for the packages stored in: /Users/vk/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b91d631a-7c8b-472d-a9bb-b82ce9df2270;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 91ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   

23/05/04 09:09:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Delta Lake table called 'sales' that tracks the daily sales of products in your store.

In [9]:

schema = StructType([
    StructField("date", DateType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("revenue", FloatType(), True)
])

sample_data = [
    (date(2023,1,1), 101, 5, 150.0),
    (date(2023,1,1), 102, 3, 90.0),
    (date(2023,1,2), 101, 7, 210.0),
    (date(2023,1,2), 102, 2, 60.0)
]

sales_df = spark.createDataFrame(sample_data, schema=schema)

In [12]:
#Write the data to Deltalake table
sales_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/tmp/sales")


23/05/04 09:33:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [16]:
sales=spark.read.format("delta").load('/tmp/sales')
sales.show()

+----------+----------+--------+-------+
|      date|product_id|quantity|revenue|
+----------+----------+--------+-------+
|2023-01-01|       101|       5|  150.0|
|2023-01-02|       101|       7|  210.0|
|2023-01-02|       102|       2|   60.0|
|2023-01-01|       102|       3|   90.0|
+----------+----------+--------+-------+



# We will now create an updated sales dataset with new and modified records

In [62]:
schema = StructType([
    StructField("date", DateType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("revenue", FloatType(), True)
])

updated_data = [
    (date(2023, 1, 1), 101, 6, 180.0),  # Updated record
    (date(2023, 1, 1), 102, 4, 120.0),  # Updated record
    (date(2023, 1, 3), 101, 8, 240.0),  # New record
    (date(2023, 1, 3), 102, 5, 150.0)   # New record
]

updated_sales_df = spark.createDataFrame(updated_data, schema=schema)

# Save the DataFrame to a 
updated_sales_df.write \
    .option("dateFormat","yyyy-MM-dd")\
    .csv("updated_sales.csv", mode="overwrite", header=True)
    
#updated_sales_df.show()


+----------+----------+--------+-------+
|      date|product_id|quantity|revenue|
+----------+----------+--------+-------+
|2023-01-01|       101|       6|  180.0|
|2023-01-01|       102|       4|  120.0|
|2023-01-03|       101|       8|  240.0|
|2023-01-03|       102|       5|  150.0|
+----------+----------+--------+-------+



In [18]:
delta_table = DeltaTable.forPath(spark, "/tmp/sales")

# Now we receive an updated sales dataset containing new and modified sales records that you need to merge into the 'sales' table.

In [67]:
# Read the target 'sales' table
target = spark.read.format("delta").load("/tmp/sales")

# Read the source dataset (assuming it's in CSV format)
updated_sales_df_src = spark.read.csv("updated_sales.csv", header=True, inferSchema=True)

#Parse the date values, discarding the timestamp portion:
# Define the date format pattern
date_format = "yyyy-MM-dd HH:mm:ss"

# Convert the date column to the correct format using the to_date function
source = updated_sales_df_src.withColumn("date", to_date("date", date_format))

# Create a Deltatable object 
delta_table = DeltaTable.forPath(spark, "/tmp/sales")

# Define the Merge Condition
merge_condition = "target.date = source.date AND target.product_id = source.product_id"

# Perform the Merge Operation
delta_table.alias("target")\
    .merge(source.alias("source"),merge_condition)\
    .whenMatchedUpdate(set={"quantity": "source.quantity", "revenue": "source.revenue"})\
    .whenNotMatchedInsert(values={"date": "source.date", "product_id": "source.product_id", "quantity": "source.quantity", "revenue": "source.revenue"}) \
    .execute()



23/05/04 11:27:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/05/04 11:27:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/05/04 11:27:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.


# Verify the Merge Results

In [68]:
# Read the updated 'sales' table
updated_sales = spark.read.format("delta").load("/tmp/sales")

# Display the updated 'sales' table
updated_sales.show()

+----------+----------+--------+-------+
|      date|product_id|quantity|revenue|
+----------+----------+--------+-------+
|2023-01-01|       101|       6|  180.0|
|2023-01-01|       102|       4|  120.0|
|2023-01-02|       101|       7|  210.0|
|2023-01-02|       102|       2|   60.0|
|2023-01-03|       101|       8|  240.0|
|2023-01-03|       102|       5|  150.0|
+----------+----------+--------+-------+

