In [0]:

# Loading Data
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/ecommerce_data/delta/events_oct"

deltaTable = DeltaTable.forPath(spark, delta_path)
spark.read.format("delta").load(delta_path).count()

42448764

In [0]:
 #Defining Schema
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType, DoubleType

schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])
     


In [0]:
# creating sample dataframe
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType, DoubleType
from datetime import datetime

updates = spark.createDataFrame(
    [
        (datetime(2019,10,1,0,0,0), "purchase", 999999, 12345, "electronics.smartphone", "TestBrand", 499.0, 888888, "session_test_1"),
        (datetime(2019,10,1,0,1,0), "view",     999998, 12346, "electronics.audio",       "TestBrand2", 99.0,  777777, "session_test_2")
    ],
    schema=schema
)
updates.show()

+-------------------+----------+----------+-----------+--------------------+----------+-----+-------+--------------+
|         event_time|event_type|product_id|category_id|       category_code|     brand|price|user_id|  user_session|
+-------------------+----------+----------+-----------+--------------------+----------+-----+-------+--------------+
|2019-10-01 00:00:00|  purchase|    999999|      12345|electronics.smart...| TestBrand|499.0| 888888|session_test_1|
|2019-10-01 00:01:00|      view|    999998|      12346|   electronics.audio|TestBrand2| 99.0| 777777|session_test_2|
+-------------------+----------+----------+-----------+--------------------+----------+-----+-------+--------------+



In [0]:

# Merging with the delta table
deltaTable.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
     


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
 #verfying the merge
spark.read.format("delta").load(delta_path) \
     .filter("user_session LIKE 'session_test_%'") \
     .show()
     


+-------------------+----------+----------+-----------+--------------------+----------+-----+-------+--------------+
|         event_time|event_type|product_id|category_id|       category_code|     brand|price|user_id|  user_session|
+-------------------+----------+----------+-----------+--------------------+----------+-----+-------+--------------+
|2019-10-01 00:00:00|  purchase|    999999|      12345|electronics.smart...| TestBrand|499.0| 888888|session_test_1|
|2019-10-01 00:01:00|      view|    999998|      12346|   electronics.audio|TestBrand2| 99.0| 777777|session_test_2|
+-------------------+----------+----------+-----------+--------------------+----------+-----+-------+--------------+



In [0]:
# checking version history
# Current count
current = spark.read.format("delta").load(delta_path).count()

# Version 0
v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(delta_path)

print("Current:", current)
print("Version 0:", v0.count())
     


Current: 42448766
Version 0: 42448764


In [0]:

# running optimization on table
spark.sql("OPTIMIZE events_table ZORDER BY (event_type, user_id)")
     

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:

# Performing cleanup
spark.sql("VACUUM events_table RETAIN 168 HOURS")

DataFrame[path: string]