In [0]:
from pyspark.sql.functions import from_json, col, lit, coalesce
from pyspark.sql.types import StructType, StructField, StringType

In [0]:
df =spark.read.option("header", True)\
               .option("escape", '"')\
               .option("quote", '"')\
               .option("multiLine", True)\
               .csv("dbfs:/FileStore/data/sales.csv")
df.show(truncate=False)

+----------------+----------+-----------------+----------------------------+--------+--------+---------+---------+------------------------------------------+
|SalesOrderNumber|OrderDate |CustomerName     |EmailAddress                |Item    |Quantity|UnitPrice|TaxAmount|ProductMetadata                           |
+----------------+----------+-----------------+----------------------------+--------+--------+---------+---------+------------------------------------------+
|SO1000          |2024-06-14|Jeffrey Perez    |gonzalezryan@example.org    |Gadget X|5       |15.33    |1.23     |{"color": "blue", "warranty": "3 years"}  |
|SO1001          |2024-09-25|Rebekah Miller   |cardenasbryan@example.com   |Widget A|10      |14.42    |1.15     |{"color": "red", "warranty": "1 year"}    |
|SO1002          |2024-08-17|Anna Hess        |jasonhansen@example.com     |Widget C|2       |56.82    |NULL     |{"color": "yellow", "warranty": "3 years"}|
|SO1003          |2024-10-08|Joseph Hernandez |timot

In [0]:
sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/data/sales.csv")

print("Loaded sales.csv file:")
sales_df.show(5, truncate=False)

Loaded sales.csv file:
+----------------+----------+----------------+----------------------------+--------+--------+---------+---------+-----------------------+
|SalesOrderNumber|OrderDate |CustomerName    |EmailAddress                |Item    |Quantity|UnitPrice|TaxAmount|ProductMetadata        |
+----------------+----------+----------------+----------------------------+--------+--------+---------+---------+-----------------------+
|SO1000          |2024-06-14|Jeffrey Perez   |gonzalezryan@example.org    |Gadget X|5       |15.33    |1.23     |"{""color"": ""blue""  |
|SO1001          |2024-09-25|Rebekah Miller  |cardenasbryan@example.com   |Widget A|10      |14.42    |1.15     |"{""color"": ""red""   |
|SO1002          |2024-08-17|Anna Hess       |jasonhansen@example.com     |Widget C|2       |56.82    |NULL     |"{""color"": ""yellow""|
|SO1003          |2024-10-08|Joseph Hernandez|timothyfernandez@example.net|Widget B|3       |35.79    |2.86     |"{""color"": ""red""   |
|SO1004    

In [0]:
print(sales_df.select("ProductMetadata").first()["ProductMetadata"])

"{""color"": ""blue""


In [0]:
sales_df.select("ProductMetadata").show(truncate=False)

+-----------------------+
|ProductMetadata        |
+-----------------------+
|"{""color"": ""blue""  |
|"{""color"": ""red""   |
|"{""color"": ""yellow""|
|"{""color"": ""red""   |
|"{""color"": ""green"" |
|"{""color"": ""red""   |
|"{""color"": ""blue""  |
|"{""color"": ""red""   |
|"{""color"": ""blue""  |
|"{""color"": ""yellow""|
|"{""color"": ""yellow""|
|"{""color"": ""blue""  |
|"{""color"": ""yellow""|
|"{""color"": ""red""   |
|"{""color"": ""yellow""|
|"{""color"": ""green"" |
|"{""color"": ""red""   |
|"{""color"": ""red""   |
|"{""color"": ""red""   |
|"{""color"": ""green"" |
+-----------------------+
only showing top 20 rows


In [0]:
# 2. Write as Delta table (overwrite if exists)
df.write.format("delta").mode("overwrite").saveAsTable("sales_delta_demo_json_fix")

print("sales_delta_demo Delta table created.")

sales_delta_demo Delta table created.


In [0]:
%sql
select * from sales_delta_demo_json_fix

SalesOrderNumber,OrderDate,CustomerName,EmailAddress,Item,Quantity,UnitPrice,TaxAmount,ProductMetadata
SO1000,2024-06-14,Jeffrey Perez,gonzalezryan@example.org,Gadget X,5.0,15.33,1.23,"{""color"": ""blue"", ""warranty"": ""3 years""}"
SO1001,2024-09-25,Rebekah Miller,cardenasbryan@example.com,Widget A,10.0,14.42,1.15,"{""color"": ""red"", ""warranty"": ""1 year""}"
SO1002,2024-08-17,Anna Hess,jasonhansen@example.com,Widget C,2.0,56.82,,"{""color"": ""yellow"", ""warranty"": ""3 years""}"
SO1003,2024-10-08,Joseph Hernandez,timothyfernandez@example.net,Widget B,3.0,35.79,2.86,"{""color"": ""red"", ""warranty"": ""2 years""}"
SO1004,2025-01-21,Emily Reed,robertdavis@example.com,Widget C,0.0,66.75,5.34,"{""color"": ""green"", ""warranty"": ""3 years""}"
SO1005,2025-03-03,Victoria Gonzalez,pamela73@example.com,Gadget Y,20.0,20.18,1.61,"{""color"": ""red"", ""warranty"": ""2 years""}"
SO1006,2024-06-04,Michael Hines,jjenkins@example.org,Gadget Y,3.0,81.16,6.49,"{""color"": ""blue"", ""warranty"": ""3 years""}"
SO1007,2024-10-12,Megan Rogers,ipatterson@example.com,Widget A,2.0,28.99,2.32,"{""color"": ""red"", ""warranty"": ""3 years""}"
SO1008,2025-02-21,Todd Johnson,collinsalyssa@example.com,Gadget Y,2.0,55.19,4.42,"{""color"": ""blue"", ""warranty"": ""3 years""}"
SO1009,2025-01-18,Barbara Thomas,joneal@example.net,Gadget X,0.0,93.65,7.49,"{""color"": ""yellow"", ""warranty"": ""3 years""}"


In [0]:
# 3. Create SQL UDF to uppercase CustomerName
spark.sql("""
CREATE OR REPLACE FUNCTION to_upper_case(str STRING) RETURNS STRING
RETURN UPPER(str)
""")
print("SQL UDF 'to_upper_case' created.")




SQL UDF 'to_upper_case' created.


In [0]:
# 4. Create SQL UDF to Reverse CustomerName
spark.sql("""
CREATE OR REPLACE FUNCTION to_reverse(str STRING) RETURNS STRING
RETURN Reverse(str)
""")
print("SQL UDF 'to_reverse' created.")


SQL UDF 'to_reverse' created.


In [0]:
spark.sql("SELECT CustomerName, to_upper_case(CustomerName) AS CustomerName_Upper, to_reverse(CustomerName) AS CustomerName_Reverse FROM sales_delta_demo LIMIT 10").show(truncate=False)

+-----------------+------------------+--------------------+
|CustomerName     |CustomerName_Upper|CustomerName_Reverse|
+-----------------+------------------+--------------------+
|Jeffrey Perez    |JEFFREY PEREZ     |zereP yerffeJ       |
|Rebekah Miller   |REBEKAH MILLER    |relliM hakebeR      |
|Anna Hess        |ANNA HESS         |sseH annA           |
|Joseph Hernandez |JOSEPH HERNANDEZ  |zednanreH hpesoJ    |
|Emily Reed       |EMILY REED        |deeR ylimE          |
|Victoria Gonzalez|VICTORIA GONZALEZ |zelaznoG airotciV   |
|Michael Hines    |MICHAEL HINES     |seniH leahciM       |
|Megan Rogers     |MEGAN ROGERS      |sregoR nageM        |
|Todd Johnson     |TODD JOHNSON      |nosnhoJ ddoT        |
|Barbara Thomas   |BARBARA THOMAS    |samohT arabraB      |
+-----------------+------------------+--------------------+



In [0]:
%sql
select * from sales_delta_demo

SalesOrderNumber,OrderDate,CustomerName,EmailAddress,Item,Quantity,UnitPrice,TaxAmount,ProductMetadata
SO1000,2024-06-14,Jeffrey Perez,gonzalezryan@example.org,Gadget X,5.0,15.33,1.23,"""{""""color"""": """"blue"""""
SO1001,2024-09-25,Rebekah Miller,cardenasbryan@example.com,Widget A,10.0,14.42,1.15,"""{""""color"""": """"red"""""
SO1002,2024-08-17,Anna Hess,jasonhansen@example.com,Widget C,2.0,56.82,,"""{""""color"""": """"yellow"""""
SO1003,2024-10-08,Joseph Hernandez,timothyfernandez@example.net,Widget B,3.0,35.79,2.86,"""{""""color"""": """"red"""""
SO1004,2025-01-21,Emily Reed,robertdavis@example.com,Widget C,0.0,66.75,5.34,"""{""""color"""": """"green"""""
SO1005,2025-03-03,Victoria Gonzalez,pamela73@example.com,Gadget Y,20.0,20.18,1.61,"""{""""color"""": """"red"""""
SO1006,2024-06-04,Michael Hines,jjenkins@example.org,Gadget Y,3.0,81.16,6.49,"""{""""color"""": """"blue"""""
SO1007,2024-10-12,Megan Rogers,ipatterson@example.com,Widget A,2.0,28.99,2.32,"""{""""color"""": """"red"""""
SO1008,2025-02-21,Todd Johnson,collinsalyssa@example.com,Gadget Y,2.0,55.19,4.42,"""{""""color"""": """"blue"""""
SO1009,2025-01-18,Barbara Thomas,joneal@example.net,Gadget X,0.0,93.65,7.49,"""{""""color"""": """"yellow"""""


In [0]:
"{""color"": ""yellow""

In [0]:
# 5. Create SQL UDF to find the Demand
spark.sql("""
CREATE OR REPLACE FUNCTION demand(qty INT) RETURNS STRING
RETURN CASE
    WHEN qty<2 THEN 'Low'
    WHEN qty BETWEEN 2 and 6 THEN 'Medium'
    WHEN qty>=10 THEN 'High'
END;
""")
print("SQL UDF 'demand' created.")

SQL UDF 'demand' created.


In [0]:
%sql
select Quantity, demand(Quantity) as Demand, demand(TaxAmount) as TaxDemand from sales_delta_demo

Quantity,Demand,TaxDemand
5.0,Medium,Low
10.0,High,Low
2.0,Medium,
3.0,Medium,Medium
0.0,Low,Medium
20.0,High,Low
3.0,Medium,Medium
2.0,Medium,Medium
2.0,Medium,Medium
0.0,Low,


In [0]:
# 4. Parse JSON in ProductMetadata column
json_schema = StructType([
    StructField("color", StringType(), True),
    StructField("warranty", StringType(), True)
])


In [0]:
# Clean ProductMetadata strings (remove leading/trailing quotes and fix escapes)
from pyspark.sql.functions import regexp_replace

sales_clean = df.withColumn(
    "ProductMetadata_clean",
    regexp_replace(col("ProductMetadata"), '^"+|"+$', '')
).withColumn(
    "ProductMetadata_clean",
    regexp_replace(col("ProductMetadata_clean"), '""', '"')
)

df_parsed = sales_clean.withColumn("ProductDetails", from_json(col("ProductMetadata_clean"), json_schema))

df_parsed.select("SalesOrderNumber", "CustomerName", "ProductDetails.color", "ProductDetails.warranty").show(10, truncate=False)


+----------------+-----------------+------+--------+
|SalesOrderNumber|CustomerName     |color |warranty|
+----------------+-----------------+------+--------+
|SO1000          |Jeffrey Perez    |blue  |3 years |
|SO1001          |Rebekah Miller   |red   |1 year  |
|SO1002          |Anna Hess        |yellow|3 years |
|SO1003          |Joseph Hernandez |red   |2 years |
|SO1004          |Emily Reed       |green |3 years |
|SO1005          |Victoria Gonzalez|red   |2 years |
|SO1006          |Michael Hines    |blue  |3 years |
|SO1007          |Megan Rogers     |red   |3 years |
|SO1008          |Todd Johnson     |blue  |3 years |
|SO1009          |Barbara Thomas   |yellow|3 years |
+----------------+-----------------+------+--------+
only showing top 10 rows


In [0]:
# 5. Schema Evolution: Add a new column 'Discount' with default value 0.1
df_with_discount = df_parsed.withColumn("Discount", lit(0.1))

df_with_discount.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("sales_delta_demo_json_fix")

print("Added 'Discount' column using schema evolution.")

spark.table("sales_delta_demo").printSchema()


Added 'Discount' column using schema evolution.
root
 |-- SalesOrderNumber: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- TaxAmount: double (nullable = true)
 |-- ProductMetadata: string (nullable = true)



In [0]:
%sql 
select * from sales_delta_demo_json_fix

SalesOrderNumber,OrderDate,CustomerName,EmailAddress,Item,Quantity,UnitPrice,TaxAmount,ProductMetadata,ProductMetadata_clean,ProductDetails,Discount
SO1000,2024-06-14,Jeffrey Perez,gonzalezryan@example.org,Gadget X,5.0,15.33,1.23,"{""color"": ""blue"", ""warranty"": ""3 years""}","{""color"": ""blue"", ""warranty"": ""3 years""}","List(blue, 3 years)",0.1
SO1001,2024-09-25,Rebekah Miller,cardenasbryan@example.com,Widget A,10.0,14.42,1.15,"{""color"": ""red"", ""warranty"": ""1 year""}","{""color"": ""red"", ""warranty"": ""1 year""}","List(red, 1 year)",0.1
SO1002,2024-08-17,Anna Hess,jasonhansen@example.com,Widget C,2.0,56.82,,"{""color"": ""yellow"", ""warranty"": ""3 years""}","{""color"": ""yellow"", ""warranty"": ""3 years""}","List(yellow, 3 years)",0.1
SO1003,2024-10-08,Joseph Hernandez,timothyfernandez@example.net,Widget B,3.0,35.79,2.86,"{""color"": ""red"", ""warranty"": ""2 years""}","{""color"": ""red"", ""warranty"": ""2 years""}","List(red, 2 years)",0.1
SO1004,2025-01-21,Emily Reed,robertdavis@example.com,Widget C,0.0,66.75,5.34,"{""color"": ""green"", ""warranty"": ""3 years""}","{""color"": ""green"", ""warranty"": ""3 years""}","List(green, 3 years)",0.1
SO1005,2025-03-03,Victoria Gonzalez,pamela73@example.com,Gadget Y,20.0,20.18,1.61,"{""color"": ""red"", ""warranty"": ""2 years""}","{""color"": ""red"", ""warranty"": ""2 years""}","List(red, 2 years)",0.1
SO1006,2024-06-04,Michael Hines,jjenkins@example.org,Gadget Y,3.0,81.16,6.49,"{""color"": ""blue"", ""warranty"": ""3 years""}","{""color"": ""blue"", ""warranty"": ""3 years""}","List(blue, 3 years)",0.1
SO1007,2024-10-12,Megan Rogers,ipatterson@example.com,Widget A,2.0,28.99,2.32,"{""color"": ""red"", ""warranty"": ""3 years""}","{""color"": ""red"", ""warranty"": ""3 years""}","List(red, 3 years)",0.1
SO1008,2025-02-21,Todd Johnson,collinsalyssa@example.com,Gadget Y,2.0,55.19,4.42,"{""color"": ""blue"", ""warranty"": ""3 years""}","{""color"": ""blue"", ""warranty"": ""3 years""}","List(blue, 3 years)",0.1
SO1009,2025-01-18,Barbara Thomas,joneal@example.net,Gadget X,0.0,93.65,7.49,"{""color"": ""yellow"", ""warranty"": ""3 years""}","{""color"": ""yellow"", ""warranty"": ""3 years""}","List(yellow, 3 years)",0.1


In [0]:
# 6. Time Travel: Query previous version of the table (version 0)
print("Time travel query to version 0:")
spark.sql("SELECT * FROM sales_delta_demo VERSION AS OF 0 LIMIT 10").show(truncate=False)


In [0]:
# 7. Optimize table and Z-order by CustomerName
spark.sql("OPTIMIZE sales_delta_demo ZORDER BY (CustomerName)")

print("OPTIMIZE with ZORDER completed.")