# **Set paths and read CSV**

In [0]:
from pyspark.sql.functions import current_timestamp

csv_path = "/Volumes/workspace/default/phase_1/sample_sales.csv" 
#delta_path = "/tmp/delta/sample_sales.delta"                # local Delta path for this lab     
#table_name = "sample_sales"                                 # will register this as a table

df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv(csv_path))

display(df.limit(10))
print("Rows:", df.count())
df.printSchema()

date,order_id,customer_id,category,product,quantity,unit_price,revenue,city,state
2025-09-01,1001,1,Electronics,Headphones,2,49.99,99.98,Stockton,CA
2025-09-01,1002,2,Home,Toaster,1,29.99,29.99,Lodi,CA
2025-09-02,1003,3,Apparel,T-Shirt,3,15.0,45.0,Tracy,CA
2025-09-02,1004,4,Electronics,USB-C Cable,5,9.99,49.95,Manteca,CA
2025-09-03,1005,1,Grocery,Olive Oil,1,12.49,12.49,Stockton,CA
2025-09-03,1006,5,Apparel,Jeans,1,39.99,39.99,Lathrop,CA
2025-09-04,1007,6,Home,Kettle,1,24.99,24.99,Lodi,CA
2025-09-04,1008,7,Electronics,Mouse,2,19.99,39.98,Stockton,CA
2025-09-05,1009,8,Apparel,Jacket,1,79.99,79.99,Tracy,CA
2025-09-05,1010,9,Grocery,Coffee Beans,2,14.49,28.98,Manteca,CA


Rows: 12
root
 |-- date: date (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- revenue: double (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



# Enrich & write as Delta and Register as Delta table

In [0]:
df2 = df.withColumn("loaded_at", current_timestamp())
# write to Delta (overwrite for clean reruns)
(df2.write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema", True)
 .save(delta_path))                     # error because writing to the public DBFS root (e.g., /tmp/delta/...) is disabled and not recommended in Databricks. You should write to a location in your workspace's cloud storage, such as an S3 bucket path

print("Delta written to:", delta_path)

[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-6991987240920551>, line 7[0m
[1;32m      1[0m df2 [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mloaded_at[39m[38;5;124m"[39m, current_timestamp())
[1;32m      2[0m [38;5;66;03m# write to Delta (overwrite for clean reruns)[39;00m
[1;32m      3[0m (df2[38;5;241m.[39mwrite
[1;32m      4[0m  [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)
[1;32m      5[0m  [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)
[1;32m      6[0m  [38;5;241m.[39moption([38;5;124m"[39m[38;5;124moverwriteSchema[39m[38;5;124m"[39m, [38;5;28;01mTrue[39;00m)
[0;32m----> 7[0m  [38;5;241m.[39msave(delta_path))
[1;32m      9[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mDelta written to:[39m[3

# Now write a managed table in Unity Catalog

In [0]:
from pyspark.sql.functions import current_timestamp

table_fqn = "workspace.default.sample_sales" # <catalog>.<schema>.<table>

df2 = df.withColumn("loaded_at", current_timestamp())

# write as a managed Delta table (unity catalog)
(df2.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(table_fqn))

print("Table written:", table_fqn)

Table written: workspace.default.sample_sales


## Quick Checks

In [0]:
%sql
select count(*) FROM workspace.default.sample_sales;
DESCRIBE HISTORY workspace.default.sample_sales;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-10-08T14:02:53.000Z,70512695036468,kanwardeep1387@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(559784884162390),1008-135646-tdkolt3d-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 12, numOutputBytes -> 3331)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13


# Run a few starter queries

**Daily Revenue**

In [0]:
%sql
select date, round(sum(revenue), 2) as daily_revenue
from sample_sales
group by date
order by date;

date,daily_revenue
2025-09-01,129.97
2025-09-02,94.95
2025-09-03,52.48
2025-09-04,64.97
2025-09-05,108.97
2025-09-06,79.95


**Top 3 products by revenue**

In [0]:
%sql
select product, round(sum(revenue), 2) as total_revenue
from sample_sales
group by product
order by total_revenue desc
limit 3;

product,total_revenue
Headphones,99.98
Jacket,79.99
Keyboard,59.99


**City-level revenue**

In [0]:
%sql
select city, round(sum(revenue), 2) as total_revenue
from sample_sales
group by city
order by total_revenue desc;

city,total_revenue
Stockton,172.41
Tracy,124.99
Lodi,114.97
Manteca,78.93
Lathrop,39.99


**Peek at Delta History**
This is useful later for time travel; seeing it now helps

In [0]:
%sql
Describe history sample_sales;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-10-08T14:02:53.000Z,70512695036468,kanwardeep1387@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(559784884162390),1008-135646-tdkolt3d-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 12, numOutputBytes -> 3331)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
