In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.ecommerce;
USE CATALOG workspace;
USE SCHEMA ecommerce;

In [0]:
df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data",
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5)


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|53

In [0]:
from pyspark.sql import functions as F

top_revenue_products = (
    df.filter(F.col("event_type") == "purchase")
      .groupBy("product_id", "brand")
      .agg(F.sum("price").alias("total_revenue"))
      .orderBy(F.desc("total_revenue"))
      .limit(5)
)

top_revenue_products.show()


+----------+-------+--------------------+
|product_id|  brand|       total_revenue|
+----------+-------+--------------------+
|   1005115|  apple|3.3032381669999924E7|
|   1005105|  apple| 2.168460337000001E7|
|   1004249|  apple|1.3545407540000014E7|
|   1005135|  apple|1.2654328769999998E7|
|   1004767|samsung|1.1004748489999998E7|
+----------+-------+--------------------+



In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("user_id").orderBy("event_time")

df_with_running_total = df.withColumn(
    "cumulative_events",
    F.count("*").over(window_spec)
)

df_with_running_total.select(
    "user_id", "event_time", "cumulative_events"
).show(5)


+--------+-------------------+-----------------+
| user_id|         event_time|cumulative_events|
+--------+-------------------+-----------------+
|65800726|2019-11-27 04:33:16|                1|
|65800726|2019-11-27 04:35:24|                2|
|81255481|2019-11-08 07:44:45|                1|
|81255481|2019-11-21 14:11:26|                2|
|82079354|2019-11-28 04:58:01|                1|
+--------+-------------------+-----------------+
only showing top 5 rows


In [0]:
conversion = (
    df.groupBy("category_code")
      .agg(
          F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("views"),
          F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases")
      )
      .withColumn(
          "conversion_rate",
          (F.col("purchases") / F.col("views")) * 100
      )
)

conversion.show()


+--------------------+--------+---------+-------------------+
|       category_code|   views|purchases|    conversion_rate|
+--------------------+--------+---------+-------------------+
| stationery.cartrige|   19323|      325| 1.6819334471872898|
|electronics.video.tv| 3127266|    51839| 1.6576460077268773|
|  accessories.wallet|  112191|      676| 0.6025438760684904|
|appliances.kitche...|  118134|     1287| 1.0894408045101325|
|                NULL|34073918|   407643| 1.1963490667553993|
|construction.tool...|  233215|     2200| 0.9433355487425765|
|appliances.enviro...|  422673|     6066| 1.4351519969337998|
|country_yard.furn...|    2785|        4| 0.1436265709156194|
|       apparel.shoes| 2596322|    14395|  0.554438162908915|
|electronics.audio...|   73039|      919| 1.2582319035036076|
|appliances.kitche...|  238132|     1659| 0.6966724337762249|
|electronics.audio...|  615955|     6478| 1.0517002053721458|
|country_yard.lawn...|   11580|       82| 0.7081174438687392|
|furnitu

In [0]:
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.ecommerce.events_delta")


In [0]:
%sql
DESCRIBE DETAIL workspace.ecommerce.events_delta;


format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,e25176cf-ea06-4746-b507-c4b1078a8d04,workspace.ecommerce.events_delta,,,2026-01-12T16:40:34.072Z,2026-01-12T16:41:39.000Z,List(),List(),111,1916063378,"Map(delta.parquet.compression.codec -> zstd, delta.enableDeletionVectors -> true)",3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


In [0]:
try:
    wrong_schema_df = spark.createDataFrame(
        [("a", "b", "c")],
        ["x", "y", "z"]
    )

    wrong_schema_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("workspace.ecommerce.events_delta")

except Exception as e:
    print("Schema enforcement works:")
    print(e)


Schema enforcement works:
[_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: e25176cf-ea06-4746-b507-c4b1078a8d04).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- event_time: timestamp (nullable = true)
-- event_type: string (nullable = true)
-- product_id: integer (nullable = true)
-- category_id: long (nullable = true)
-- category_code: string (nullable = true)
-- brand: string (nullable = true)
-- price: double (nullable = true)
-- user_id: integer (nullable = true)
-- user_session: string (nullable = true)


Data schema:
root
-- x: string (nullable = true)
-- y: string (nullable = true)
-- z: string (nullable = true)

         
Table ACLs are enabled in this cluster, so au

In [0]:
from pyspark.sql.functions import row_number

dedup_window = Window.partitionBy(
    "user_id", "product_id", "event_time"
).orderBy("event_time")

dedup_df = (
    df.withColumn("rn", row_number().over(dedup_window))
      .filter("rn = 1")
      .drop("rn")
)

dedup_df.show(5)


+-------------------+----------+----------+-------------------+------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|     category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+------------------+--------+------+---------+--------------------+
|2019-11-29 04:24:47|      view|   5100644|2053013553341792533|electronics.clocks|  garmin|159.58| 96041329|3c3bb8ed-14c0-41f...|
|2019-11-26 07:38:59|      view|  34800084|2062461754293617058|              NULL|  cantra| 128.7|105201424|c9aadb18-c9c7-425...|
|2019-11-28 08:25:09|      view|  12702958|2053013553559896355|              NULL|cordiant| 42.47|122384079|6cee7edb-68ae-4be...|
|2019-11-14 02:36:19|      view|  12720549|2053013553559896355|              NULL|cordiant| 37.32|149382035|360ac34e-7942-450...|
|2019-11-21 11:07:21|      view|  13100014|2053013553526341921|              NULL|    skad

In [0]:
%sql
MERGE INTO workspace.ecommerce.events_delta t
USING workspace.ecommerce.events_delta s
ON t.user_id = s.user_id
AND t.product_id = s.product_id
AND t.event_time = s.event_time
WHEN NOT MATCHED THEN INSERT *


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0
