In [0]:
# Load data
events = spark.read.csv("/path/to/sample.csv", header=True, inferSchema=True)

# Basic operations
events.select("event_type", "product_name", "price").show(10)
events.filter("price > 100").count()
events.groupBy("event_type").count().show()
top_brands = events.groupBy("brand").count().orderBy("count", ascending=False).limit(5)

In [0]:
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)
# Basic operations
events.select("event_type","category_id","category_code","price").show(10)
events.filter("price > 100").count()
events.groupBy("event_type").count().show()
events.orderBy("price", ascending=False).show(10)
top_brands = events.groupBy("brand").count().orderBy("count", ascending=False).limit(5)

In [0]:
%python

events.write.mode("overwrite").option("header", "true").csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct-export.csv")

In [0]:
%python
events.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct-export")

In [0]:
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
# Basic operations
events.select("event_type","category_id","category_code","price").show(10)
events.filter("price > 100").count()
events.groupBy("event_type").count().show()
events.orderBy("price", ascending=False).show(10)
top_brands = events.groupBy("brand").count().orderBy("count", ascending=False).limit(5)

events.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov-export")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
# Basic operations
events.select("event_type","category_id","category_code","price").show(10)
events.filter("price > 100").count()
events.groupBy("event_type").count().show()
events.orderBy("price", ascending=False).show(10)
top_brands = events.groupBy("brand").count().orderBy("count", ascending=False).limit(5)

# Top 5 products by revenue
revenue = events.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id", "product_name") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)

# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events.withColumn("cumulative_events", F.count("*").over(window))

# Conversion rate by category
conversion = events.groupBy("category_code").agg(
    F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchase"),
    F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("view")
).withColumn(
    "conversion_rate",
    (F.col("purchase") / F.col("view")) * 100
)
display(conversion)

In [0]:
from pyspark.sql import functions as F

# Derived feature: discounted price (10% off)
events = events.withColumn(
    "discounted_price",
    F.col("price") * 0.9
)

# Derived feature: is_high_price (True if price > 100)
events = events.withColumn(
    "is_high_price",
    F.col("price") > 100
)

display(events)

In [0]:
events.write.format("delta").mode("overwrite").save("/delta/events")

# Create managed table
events.write.format("delta").saveAsTable("events_table")

# SQL approach
spark.sql("""
    CREATE TABLE events_delta
    USING DELTA
    AS SELECT * FROM events_table
""")

# Test schema enforcement
try:
    wrong_schema = spark.createDataFrame([("a","b","c")], ["x","y","z"])
    wrong_schema.write.format("delta").mode("append").save("/delta/events")
except Exception as e:
    print(f"Schema enforcement: {e}")


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

events.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/events")

# Create managed table
events.write.format("delta").saveAsTable("workspace.ecommerce.events_table")

# SQL approachworkspace.ecommerce.events_table
spark.sql("""
    CREATE TABLE events_delta
    USING DELTA
    AS SELECT * FROM workspace.ecommerce.events_table
""")

# Test schema enforcement
try:
    wrong_schema = spark.createDataFrame([("a","b","c")], ["x","y","z"])
    wrong_schema.write.format("delta").mode("append").save("/Volumes/workspace/ecommerce/delta/events")
except Exception as e:
    print(f"Schema enforcement: {e}")


In [0]:
%sql
-- Create a managed volume in your schema
CREATE VOLUME delta
COMMENT 'Delta volume for ecommerce events'
IN CATALOG workspace
IN SCHEMA ecommerce;

In [0]:
%sql
CREATE VOLUME delta
COMMENT 'Delta volume for ecommerce events'
IN workspace.ecommerce;

In [0]:
%sql
CREATE VOLUME workspace.ecommerce.delta
COMMENT 'Delta volume for ecommerce events';

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

events.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/events")

# Create managed table
events.write.format("delta").saveAsTable("workspace.ecommerce.events_table")

# SQL approachworkspace.ecommerce.events_table
spark.sql("""
    CREATE TABLE events_delta
    USING DELTA
    AS SELECT * FROM workspace.ecommerce.events_table
""")

# Test schema enforcement
try:
    wrong_schema = spark.createDataFrame([("a","b","c")], ["x","y","z"])
    wrong_schema.write.format("delta").mode("append").save("/Volumes/workspace/ecommerce/delta/events")
except Exception as e:
    print(f"Schema enforcement: {e}")

In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path = "/Volumes/workspace/ecommerce/delta/new_data.csv"

# MERGE for incremental updates
deltaTable = DeltaTable.forPath(spark, delta_path)
updates = spark.read.csv(csv_path, header=True, inferSchema=True)

deltaTable.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time travel
v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01").load(delta_path)

# Optimize
spark.sql("OPTIMIZE workspace.ecommerce.events ZORDER BY (event_type, user_id)")
spark.sql("VACUUM workspace.ecommerce.events RETAIN 168 HOURS")

In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path = "/Volumes/workspace/ecommerce/delta/new_data.csv"  # Upload your CSV here

deltaTable = DeltaTable.forPath(spark, delta_path)
updates = spark.read.csv(
    csv_path,
    header=True,
    inferSchema=True
)

deltaTable.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()

v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
yesterday = spark.read.format("delta")\
    .option("timestampAsOf", "2024-01-01").load(delta_path)

spark.sql("OPTIMIZE workspace.ecommerce.events ZORDER BY (event_type, user_id)")
spark.sql("VACUUM workspace.ecommerce.events RETAIN 168 HOURS")

In [0]:
events.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/events")

# Create managed table
events.write.format("delta").saveAsTable("workspace.ecommerce.csv_path")

# SQL approachworkspace.ecommerce.csv_path
spark.sql("""
    CREATE TABLE events_csv
    USING DELTA
    AS SELECT * FROM workspace.ecommerce.csv_path
""")

In [0]:
# Save Delta table to Unity Catalog volume
events.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/events")

# Create or replace managed table
events.write.format("delta").mode("overwrite").saveAsTable("workspace.ecommerce.csv_path")

# SQL approach: create or replace table
spark.sql("""
    CREATE OR REPLACE TABLE workspace.ecommerce.events_csv
    USING DELTA
    AS SELECT * FROM workspace.ecommerce.csv_path
""")

In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path = "/Volumes/workspace/ecommerce/delta/new_data.csv"  # Upload your CSV here



deltaTable = DeltaTable.forPath(spark, delta_path)
updates = spark.read.csv(
    csv_path,
    header=True,
    inferSchema=True
)

deltaTable.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()

v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
yesterday = spark.read.format("delta")\
    .option("timestampAsOf", "2024-01-01").load(delta_path)

spark.sql("OPTIMIZE workspace.ecommerce.events ZORDER BY (event_type, user_id)")
spark.sql("VACUUM workspace.ecommerce.events RETAIN 168 HOURS")

In [0]:
dbutils.fs.ls("/Volumes/workspace/ecommerce/delta/")

In [0]:
dbutils.fs.ls("/mnt/your-mount-point/")


In [0]:
csv_path = "/FileStore/tables/new_data.csv"


In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path = "/Volumes/workspace/ecommerce/delta/new_data.csv"  # now exists

# Load Delta table
deltaTable = DeltaTable.forPath(spark, delta_path)

# Read incoming CSV
updates = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

# MERGE (Upsert)
(
    deltaTable.alias("t")
    .merge(
        updates.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
dbutils.fs.ls("/FileStore/tables/")


In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path = "/Volumes/workspace/ecommerce/delta/new_data.csv"

# Load Delta table
deltaTable = DeltaTable.forPath(spark, delta_path)

# Read new data from Volume
updates = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

# Perform MERGE (UPSERT)
(
    deltaTable.alias("t")
    .merge(
        updates.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
event_time,user_session,event_type,user_id,product_id,price
2024-1-5 10:15:00,abc123,view,101,2001,0
2024-1-5 10:16:00,xyz999,purchase,102,2005,499.99
2024-1-5 10:17:00,lmn456,cart,103,2010,0
2024-1-5 10:18:00,abc123,purchase,101,2001,299.99


In [0]:
from delta.tables import DeltaTable

# MERGE for incremental updates
deltaTable = DeltaTable.forPath(spark, "/delta/events")
updates = spark.read.csv("/path/to/new_data.csv", header=True, inferSchema=True)

deltaTable.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time travel
v0 = spark.read.format("delta").option("versionAsOf", 0).load("/delta/events")
yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01").load("/delta/events")

# Optimize
spark.sql("OPTIMIZE events_table ZORDER BY (event_type, user_id)")
spark.sql("VACUUM events_table RETAIN 168 HOURS")


In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path   = "/Volumes/workspace/ecommerce/delta/new_data.csv"

deltaTable = DeltaTable.forPath(spark, delta_path)

updates = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

(
    deltaTable.alias("t")
    .merge(
        updates.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
from pyspark.sql.functions import lit

updates_fixed = updates.withColumn("category_id", lit(None))


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import lit

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path   = "/Volumes/workspace/ecommerce/delta/new_data.csv"

deltaTable = DeltaTable.forPath(spark, delta_path)
updates_fixed = updates.withColumn("category_id", lit(None))
updates_fixed = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

(
    deltaTable.alias("t")
    .merge(
        updates_fixed.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
from pyspark.sql.functions import lit

updates_fixed = updates.withColumn("category_id", lit(None))


In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"

deltaTable = DeltaTable.forPath(spark, delta_path)

(
    deltaTable.alias("t")
    .merge(
        updates_fixed.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
updates_fixed.printSchema()
deltaTable.toDF().printSchema()


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import lit

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path   = "/Volumes/workspace/ecommerce/delta/new_data.csv"

# Load Delta table
deltaTable = DeltaTable.forPath(spark, delta_path)

# Read CSV
updates = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

# ðŸ”¥ FORCE missing column
updates_fixed = updates.withColumn("category_id", lit(None))


In [0]:
print("SOURCE SCHEMA")
updates_fixed.printSchema()

print("TARGET SCHEMA")
deltaTable.toDF().printSchema()


In [0]:
(
    deltaTable.alias("t")
    .merge(
        updates_fixed.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import lit

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path   = "/Volumes/workspace/ecommerce/delta/new_data.csv"

# Load Delta table
deltaTable = DeltaTable.forPath(spark, delta_path)

# Read CSV
updates = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)
# ðŸ”¥ FORCE missing column
updates_fixed = updates.withColumn("category_id", lit(None))


(
    deltaTable.alias("t")
    .merge(
        updates_fixed.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/delta/events"
csv_path   = "/Volumes/workspace/ecommerce/delta/new_data.csv"

deltaTable = DeltaTable.forPath(spark, delta_path)

updates = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_path)
)

(
    deltaTable.alias("t")
    .merge(
        updates.alias("s"),
        "t.user_session = s.user_session AND t.event_time = s.event_time"
    )
    .whenMatchedUpdate(set={
        "event_type": "s.event_type",
        "user_id": "s.user_id",
        "product_id": "s.product_id",
        "price": "s.price"
        # category_id deliberately NOT touched
    })
    .whenNotMatchedInsert(values={
        "event_time": "s.event_time",
        "user_session": "s.user_session",
        "event_type": "s.event_type",
        "user_id": "s.user_id",
        "product_id": "s.product_id",
        "price": "s.price",
        "category_id": "NULL"
    })
    .execute()
)


In [0]:
history_df = deltaTable.history()
display(history_df)

In [0]:
spark.sql("OPTIMIZE delta.`/Volumes/workspace/ecommerce/delta/events` ZORDER BY (event_type, user_id)")

In [0]:
spark.sql("VACUUM delta.`/Volumes/workspace/ecommerce/delta/events` RETAIN 168 HOURS")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window



# BRONZE: Raw ingestion
raw = spark.read.csv("/Volumes/workspace/ecommerce/raw/events.csv", header=True, inferSchema=True)
raw.withColumn("ingestion_ts", F.current_timestamp()) \
   .write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/bronze/events")

In [0]:
%sql
CREATE VOLUME workspace.ecommerce.raw;
CREATE VOLUME workspace.ecommerce.bronze;
CREATE VOLUME workspace.ecommerce.silver;
CREATE VOLUME workspace.ecommerce.gold;


In [0]:
from pyspark.sql import functions as F

raw = spark.read.csv(
    "/Volumes/workspace/ecommerce/raw/events.csv",
    header=True,
    inferSchema=True
)

raw.withColumn("ingestion_ts", F.current_timestamp()) \
   .write.format("delta") \
   .mode("overwrite") \
   .save("/Volumes/workspace/ecommerce/delta/bronze/events")


In [0]:
from pyspark.sql import functions as F

raw = spark.read.csv(
    "/Volumes/workspace/ecommerce/raw/events.csv",
    header=True,
    inferSchema=True
)

raw.withColumn("ingestion_ts", F.current_timestamp()) \
   .write.format("delta") \
   .mode("overwrite") \
   .save("/Volumes/workspace/ecommerce/delta/bronze/events")

# SILVER: Cleaned data
bronze = spark.read.format("delta").load("/delta/bronze/events")
silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/silver/events")

In [0]:
# GOLD: Aggregates
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/delta/silver/events")
product_perf = silver.groupBy("product_id", "product_name") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", "price")).alias("revenue")
    ).withColumn("conversion_rate", F.col("purchases")/F.col("views")*100)
product_perf.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/gold/products")

In [0]:
from pyspark.sql import functions as F

product_perf = (
    silver
    .groupBy("product_id", "category_code")
    .agg(
        F.countDistinct(
            F.when(F.col("event_type") == "view", F.col("user_id"))
        ).alias("views"),

        F.countDistinct(
            F.when(F.col("event_type") == "purchase", F.col("user_id"))
        ).alias("purchases"),

        F.sum(
            F.when(F.col("event_type") == "purchase", F.col("price"))
        ).alias("revenue")
    )
    .withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0, (F.col("purchases") / F.col("views")) * 100)
         .otherwise(0)
    )
)


In [0]:
product_perf.write.format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/delta/gold/products")
