In [0]:
%python
# generate sample orders and add order_date
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date
from datetime import datetime, timedelta
import random

# generate sample rows
data = []
base = datetime(2025, 11, 1, 12, 0, 0)
countries = ["US","IN","GB","DE","FR"]
statuses = ["CREATED","PAID","CANCELLED"]
currencies = {"US":"USD","IN":"INR","GB":"GBP","DE":"EUR","FR":"EUR"}

for i in range(1,201):  # 200 sample rows
    ts = base + timedelta(hours=random.randint(0,720))
    country = random.choice(countries)
    currency = currencies[country]
    amount = round(random.uniform(1.0,500.0),2)
    data.append((f"ord_{i:05d}", ts, f"cust_{random.randint(1,60)}", country,
                 float(amount), currency, random.choice(statuses)))

schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_timestamp", TimestampType(), False),
    StructField("customer_id", StringType(), False),
    StructField("country", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("currency", StringType(), False),
    StructField("status", StringType(), False),
])

df = spark.createDataFrame(data, schema)
# derived column order_date (date only)
df = df.withColumn("order_date", to_date(col("order_timestamp")))

# show a quick sample
print("Sample schema:")
df.printSchema()
print("\nSample rows:")
display(df.orderBy("order_timestamp").limit(10))


Sample schema:
root
 |-- order_id: string (nullable = false)
 |-- order_timestamp: timestamp (nullable = false)
 |-- customer_id: string (nullable = false)
 |-- country: string (nullable = false)
 |-- amount: double (nullable = false)
 |-- currency: string (nullable = false)
 |-- status: string (nullable = false)
 |-- order_date: date (nullable = false)


Sample rows:


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ord_00023,2025-11-01T13:00:00.000Z,cust_14,DE,379.89,EUR,CANCELLED,2025-11-01
ord_00121,2025-11-01T13:00:00.000Z,cust_40,US,271.73,USD,CREATED,2025-11-01
ord_00191,2025-11-01T13:00:00.000Z,cust_31,US,247.18,USD,CANCELLED,2025-11-01
ord_00156,2025-11-01T14:00:00.000Z,cust_1,IN,187.21,INR,PAID,2025-11-01
ord_00107,2025-11-01T16:00:00.000Z,cust_34,FR,152.02,EUR,CANCELLED,2025-11-01
ord_00130,2025-11-01T19:00:00.000Z,cust_52,GB,115.5,GBP,CANCELLED,2025-11-01
ord_00011,2025-11-01T20:00:00.000Z,cust_3,DE,190.52,EUR,PAID,2025-11-01
ord_00042,2025-11-01T20:00:00.000Z,cust_37,US,460.75,USD,PAID,2025-11-01
ord_00128,2025-11-01T22:00:00.000Z,cust_1,IN,156.17,INR,CREATED,2025-11-01
ord_00103,2025-11-02T12:00:00.000Z,cust_39,FR,180.77,EUR,PAID,2025-11-02


In [0]:
%python
# using Unity Catalog Volume path
delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

(df.write
   .format("delta")
   .mode("overwrite")
   .partitionBy("country", "order_date")
   .option("overwriteSchema", "true")
   .save(delta_path)
)

print("Delta table written successfully to:", delta_path)


Delta table written successfully to: /Volumes/shopez_catalog/shopez_schema/shopez_volume/orders


In [0]:
%python
# Verify partition structure inside the Volume
delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

print("Root delta folder:")
display(dbutils.fs.ls(delta_path))

print("Folders under one country partition (example: IN):")
display(dbutils.fs.ls(f"{delta_path}/country=IN"))


Root delta folder:


path,name,size,modificationTime
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/_delta_log/,_delta_log/,0,1764596148238
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=DE/,country=DE/,0,1764596148238
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=FR/,country=FR/,0,1764596148238
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=GB/,country=GB/,0,1764596148238
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/,country=IN/,0,1764596148238
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=US/,country=US/,0,1764596148238


Folders under one country partition (example: IN):


path,name,size,modificationTime
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-01/,order_date=2025-11-01/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-02/,order_date=2025-11-02/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-03/,order_date=2025-11-03/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-04/,order_date=2025-11-04/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-05/,order_date=2025-11-05/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-06/,order_date=2025-11-06/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-07/,order_date=2025-11-07/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-10/,order_date=2025-11-10/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-11/,order_date=2025-11-11/,0,1764596149182
dbfs:/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders/country=IN/order_date=2025-11-13/,order_date=2025-11-13/,0,1764596149182


In [0]:
%python
# Demonstrate partition pruning

from pyspark.sql.functions import col

delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

df_delta = spark.read.format("delta").load(delta_path)

# choose one country and one date that exist in your partitions
sample_country = "IN"
sample_date = df_delta.filter(col("country") == sample_country) \
                      .select("order_date") \
                      .limit(1).collect()[0][0]

print("Using sample date:", sample_date)

# filter query (this should prune partitions)
filtered = df_delta.filter(
    (col("country") == sample_country) &
    (col("order_date") == sample_date)
)

# check physical plan for partition pruning
filtered.explain(True)

# show some rows
display(filtered.limit(10))


Using sample date: 2025-11-15
== Parsed Logical Plan ==
'Filter 'and('`==`('country, IN), '`==`('order_date, 2025-11-15))
+- Relation [order_id#11452,order_timestamp#11453,customer_id#11454,country#11455,amount#11456,currency#11457,status#11458,order_date#11459] parquet

== Analyzed Logical Plan ==
order_id: string, order_timestamp: timestamp, customer_id: string, country: string, amount: double, currency: string, status: string, order_date: date
Filter ((country#11455 = IN) AND (order_date#11459 = 2025-11-15))
+- Relation [order_id#11452,order_timestamp#11453,customer_id#11454,country#11455,amount#11456,currency#11457,status#11458,order_date#11459] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(order_date#11459) AND (order_date#11459 = 2025-11-15)) AND isnotnull(country#11455)) AND (country#11455 = IN))
+- Relation [order_id#11452,order_timestamp#11453,customer_id#11454,country#11455,amount#11456,currency#11457,status#11458,order_date#11459] parquet

== Physical Plan ==
*(1

order_id,order_timestamp,customer_id,country,amount,currency,status,order_date
ord_00106,2025-11-15T20:00:00.000Z,cust_47,IN,129.73,INR,PAID,2025-11-15


In [0]:
%python
from delta.tables import DeltaTable

delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

dt = DeltaTable.forPath(spark, delta_path)

# Update a few rows
dt.update(
    condition = "status = 'CREATED' AND amount > 300",
    set = { "status": "'PAID'" }
)

print("Update completed.")


Update completed.


In [0]:
%python
# View history (versions)
spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`").show(truncate=False)


+-------+-------------------+--------------+----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
%python
# Read previous version
df_old = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

print("Old version count:", df_old.count())

# Compare with current
df_current = spark.read.format("delta").load(delta_path)
print("Current version count:", df_current.count())


Old version count: 200
Current version count: 200


In [0]:
%python

# Create new incoming data containing the new fields

from pyspark.sql.types import *
from datetime import datetime

# new rows containing new fields
new_data = [
    ("ord_50001", datetime(2025, 11, 30, 10, 30, 0), "cust_999", "IN", 450.0, "INR", "PAID", "2025-11-30", "CARD", "NEWYEAR10"),
    ("ord_50002", datetime(2025, 11, 30, 11, 45, 0), "cust_998", "US", 120.0, "USD", "CREATED", "2025-11-30", "UPI", None),
]

schema_new = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_timestamp", TimestampType(), False),
    StructField("customer_id", StringType(), False),
    StructField("country", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("currency", StringType(), False),
    StructField("status", StringType(), False),
    StructField("order_date", StringType(), False),  # keep as string for now
    StructField("payment_method", StringType(), True),
    StructField("coupon_code", StringType(), True),
])

df_new = spark.createDataFrame(new_data, schema_new)

# convert order_date to proper date type
from pyspark.sql.functions import col, to_date
df_new = df_new.withColumn("order_date", to_date(col("order_date")))

display(df_new)


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
ord_50001,2025-11-30T10:30:00.000Z,cust_999,IN,450.0,INR,PAID,2025-11-30,CARD,NEWYEAR10
ord_50002,2025-11-30T11:45:00.000Z,cust_998,US,120.0,USD,CREATED,2025-11-30,UPI,


In [0]:
%python
# append new data into delta table with schema evolution enabled
delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

(df_new.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")     # enable schema evolution
    .save(delta_path)
)

print("Schema evolution successful — new columns added.")


Schema evolution successful — new columns added.


In [0]:
%python
# Verify new columns exist in the Delta table
df_check = spark.read.format("delta").load(delta_path)
df_check.printSchema()

display(df_check.orderBy("order_timestamp", ascending=False).limit(5))


root
 |-- order_id: string (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- status: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- coupon_code: string (nullable = true)



order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
ord_00064,2025-12-01T11:00:00.000Z,cust_39,GB,186.94,GBP,PAID,2025-12-01,,
ord_00148,2025-12-01T07:00:00.000Z,cust_5,GB,332.12,GBP,PAID,2025-12-01,,
ord_00057,2025-12-01T07:00:00.000Z,cust_33,FR,415.88,EUR,PAID,2025-12-01,,
ord_00157,2025-12-01T05:00:00.000Z,cust_57,FR,264.35,EUR,CANCELLED,2025-12-01,,
ord_00090,2025-11-30T22:00:00.000Z,cust_52,GB,303.59,GBP,PAID,2025-11-30,,


In [0]:
%python
from delta.tables import DeltaTable

delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"
dt = DeltaTable.forPath(spark, delta_path)

# UPDATE operation
dt.update(
    condition = "amount > 400",
    set = { "status": "'CANCELLED'" }
)

print("Update completed: Orders > 400 marked as CANCELLED.")


Update completed: Orders > 400 marked as CANCELLED.


In [0]:
%python
# DELETE operation
dt.delete(
    condition = "amount < 10"   # delete low-value orders
)

print("Delete completed: Orders with amount < 10 removed.")


Delete completed: Orders with amount < 10 removed.


In [0]:
%python
# Verify the changes
df_after_changes = spark.read.format("delta").load(delta_path)

print("Counts after updates & deletes:", df_after_changes.count())

display(
    df_after_changes
        .orderBy("amount")
        .limit(10)
)


Counts after updates & deletes: 195


order_id,order_timestamp,customer_id,country,amount,currency,status,order_date,payment_method,coupon_code
ord_00155,2025-11-15T04:00:00.000Z,cust_35,DE,10.17,EUR,PAID,2025-11-15,,
ord_00143,2025-11-16T09:00:00.000Z,cust_5,FR,10.51,EUR,CREATED,2025-11-16,,
ord_00018,2025-11-29T13:00:00.000Z,cust_18,IN,11.16,INR,CREATED,2025-11-29,,
ord_00192,2025-11-19T06:00:00.000Z,cust_15,FR,25.52,EUR,PAID,2025-11-19,,
ord_00054,2025-11-27T17:00:00.000Z,cust_19,IN,25.95,INR,CANCELLED,2025-11-27,,
ord_00047,2025-11-09T16:00:00.000Z,cust_57,GB,28.39,GBP,CREATED,2025-11-09,,
ord_00055,2025-11-29T13:00:00.000Z,cust_43,IN,29.42,INR,CANCELLED,2025-11-29,,
ord_00076,2025-11-07T05:00:00.000Z,cust_45,GB,31.45,GBP,CREATED,2025-11-07,,
ord_00008,2025-11-05T00:00:00.000Z,cust_30,GB,37.23,GBP,PAID,2025-11-05,,
ord_00077,2025-11-27T02:00:00.000Z,cust_37,GB,37.98,GBP,CREATED,2025-11-27,,


In [0]:
%python
# run optimize
delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

spark.sql(f"""
    OPTIMIZE delta.`{delta_path}`
""")

print("OPTIMIZE completed.")


OPTIMIZE completed.


In [0]:
%python
# Run OPTIMIZE with ZORDER
spark.sql(f"""
    OPTIMIZE delta.`{delta_path}`
    ZORDER BY (customer_id)
""")

print("OPTIMIZE + ZORDER completed.")



OPTIMIZE + ZORDER completed.


In [0]:
%python
# Confirm results in table history
spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`").show(truncate=False)


+-------+-------------------+--------------+----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
%python
# Count files BEFORE small-file demo
import pyspark

delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

# Count how many data files exist right now
files_before = dbutils.fs.ls(delta_path)
print("Files before small file demo:")
for f in files_before:
    print(f.name)
print("Total:", len(files_before))


Files before small file demo:
_delta_log/
country=DE/
country=FR/
country=GB/
country=IN/
country=US/
deletion_vector_1b2c154a-818c-46c5-ab2a-327e2bc3e333.bin
deletion_vector_444f0e12-994c-42bf-afa9-bf37495c2d8e.bin
deletion_vector_f1d9d4c0-d1c4-45cb-b2a7-821974168ce3.bin
Total: 9




In [0]:
%python

# Create small files by writing tiny batches many times

from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import datetime, date
import random

# Define the schema explicitly
tiny_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_timestamp", TimestampType(), True),
    StructField("customer_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("status", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("payment_method", StringType(), True),
    StructField("coupon_code", StringType(), True)
])

delta_path = "/Volumes/shopez_catalog/shopez_schema/shopez_volume/orders"

for i in range(50):
    small_df = spark.createDataFrame([
        (
            f"tiny_{i}",                     # order_id
            datetime.now(),                 # order_timestamp
            f"cust_{1000+i}",               # customer_id
            "IN",                           # country
            float(random.uniform(1, 20)),   # amount
            "INR",                          # currency
            "CREATED",                      # status
            date.today(),                   # order_date (Python date)
            None,                           # payment_method
            None                            # coupon_code
        )
    ], schema=tiny_schema)
    
    small_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save(delta_path)

print("50 tiny files created successfully!")



50 tiny files created successfully!


In [0]:
%python
# Check file count after tiny writes
files_after_tiny = dbutils.fs.ls(delta_path)
print("Total files:", len(files_after_tiny))


Total files: 9


In [0]:
%python
# count files after optimize
files_after_optimize = dbutils.fs.ls(delta_path)
print("Files after OPTIMIZE:")
print("Total:", len(files_after_optimize))


Files after OPTIMIZE:
Total: 9


In [0]:
%python
# count files after creating many tiny files
files_after_tiny = dbutils.fs.ls(delta_path)
print("Files after tiny writes:", len(files_after_tiny))


Files after tiny writes: 9


In [0]:
%python
# Correct recursive file counting
def count_all_files(path):
    total = 0
    items = dbutils.fs.ls(path)
    for item in items:
        if item.isDir():
            total += count_all_files(item.path)   # recursive count of subfolders
        else:
            total += 1
    return total

total_files = count_all_files(delta_path)
print("Total actual data files (recursive):", total_files)


Total actual data files (recursive): 384


In [0]:
%python
# run optimize again
spark.sql(f"OPTIMIZE delta.`{delta_path}`")


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
%python
# count again recursively
total_after_opt = count_all_files(delta_path)
print("Total files after OPTIMIZE:", total_after_opt)


Total files after OPTIMIZE: 387
