# **PYSPARK**

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

### **CUSTOMERS**

In [0]:
df = spark.read.format("csv")\
          .option("inferSchema",True)\
          .option("header",True)\
          .load("/Volumes/databricksansh/bronze/bronze_volume/customers/")

In [0]:
display(df)

customer_id,name,email,location,signup_date
1,Madison Smith,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10
2,Kyle Brown,schroedermarco@hotmail.com,Lake Anthony,2023-05-26
3,Alexander Gonzalez,fgriffin@yahoo.com,Port Joseph,2022-09-09
4,Joseph Potter,paulsweeney@hotmail.com,Amandafort,2024-03-24
5,Joshua King,riverssheryl@george.com,Port Paulbury,2023-06-14
6,Spencer Pugh,jacob61@singh-johnson.net,Duranfort,2024-03-03
7,Kathryn Gordon,brookstina@yahoo.com,Lake Michael,2022-08-30
8,Greg Weaver,davidlewis@gmail.com,New Joseph,2024-04-06
9,Elizabeth Villanueva,richard84@gmail.com,West Ashleychester,2025-04-08
10,Darlene Morris,gregory16@gmail.com,Darrenmouth,2023-06-06


In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- location: string (nullable = true)
 |-- signup_date: date (nullable = true)



In [0]:
df = df.withColumn("name",upper(col("name")))
display(df)

customer_id,name,email,location,signup_date
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14
6,SPENCER PUGH,jacob61@singh-johnson.net,Duranfort,2024-03-03
7,KATHRYN GORDON,brookstina@yahoo.com,Lake Michael,2022-08-30
8,GREG WEAVER,davidlewis@gmail.com,New Joseph,2024-04-06
9,ELIZABETH VILLANUEVA,richard84@gmail.com,West Ashleychester,2025-04-08
10,DARLENE MORRIS,gregory16@gmail.com,Darrenmouth,2023-06-06


In [0]:
df = df.withColumn("domain",split(col("email"),"@")[1])
display(df)

customer_id,name,email,location,signup_date,domain
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,calhoun-fisher.com
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,hotmail.com
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09,yahoo.com
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24,hotmail.com
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14,george.com
6,SPENCER PUGH,jacob61@singh-johnson.net,Duranfort,2024-03-03,singh-johnson.net
7,KATHRYN GORDON,brookstina@yahoo.com,Lake Michael,2022-08-30,yahoo.com
8,GREG WEAVER,davidlewis@gmail.com,New Joseph,2024-04-06,gmail.com
9,ELIZABETH VILLANUEVA,richard84@gmail.com,West Ashleychester,2025-04-08,gmail.com
10,DARLENE MORRIS,gregory16@gmail.com,Darrenmouth,2023-06-06,gmail.com


In [0]:
display(
    df.groupBy("domain")
    .agg(count(col("customer_id")).alias("total_customers"))
    .sort(col("total_customers").desc())
)

domain,total_customers
yahoo.com,38
hotmail.com,32
gmail.com,29
williams.com,3
adams.com,2
carter.com,1
calhoun-fisher.com,1
joseph.biz,1
lane-scott.com,1
burns.org,1


Databricks visualization. Run in Databricks to view.

In [0]:
df = df.withColumn("processDate",current_timestamp())
display(df)

customer_id,name,email,location,signup_date,domain,processDate
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,calhoun-fisher.com,2025-06-30T14:59:41.596Z
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,hotmail.com,2025-06-30T14:59:41.596Z
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09,yahoo.com,2025-06-30T14:59:41.596Z
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24,hotmail.com,2025-06-30T14:59:41.596Z
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14,george.com,2025-06-30T14:59:41.596Z
6,SPENCER PUGH,jacob61@singh-johnson.net,Duranfort,2024-03-03,singh-johnson.net,2025-06-30T14:59:41.596Z
7,KATHRYN GORDON,brookstina@yahoo.com,Lake Michael,2022-08-30,yahoo.com,2025-06-30T14:59:41.596Z
8,GREG WEAVER,davidlewis@gmail.com,New Joseph,2024-04-06,gmail.com,2025-06-30T14:59:41.596Z
9,ELIZABETH VILLANUEVA,richard84@gmail.com,West Ashleychester,2025-04-08,gmail.com,2025-06-30T14:59:41.596Z
10,DARLENE MORRIS,gregory16@gmail.com,Darrenmouth,2023-06-06,gmail.com,2025-06-30T14:59:41.596Z


#### UPSERT 

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricksansh.silver.customers_enr"):

    dlt_obj = DeltaTable.forName(spark, "databricksansh.silver.customers_enr")

    dlt_obj.alias("trg").merge(df.alias("src"), "trg.customer_id == src.customer_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df.write.format("delta")\
            .mode("append")\
            .saveAsTable("databricksansh.silver.customers_enr")

### **PRODUCTS**

In [0]:
df_prod = spark.read.format("csv")\
            .option("inferSchema",True)\
            .option("header",True)\
            .load("/Volumes/databricksansh/bronze/bronze_volume/products/")
display(df_prod)

product_id,product_name,category,price
1,Society Rest,Toys,190.4
2,Front Left,Toys,475.6
3,May Likely,Clothing,367.34
4,Gas Medical,Books,301.34
5,No West,Books,82.23
6,Form Rise,Electronics,82.22
7,Today Want,Clothing,33.75
8,Same All,Books,433.76
9,Brother Because,Clothing,302.55
10,Job Capital,Toys,355.5


In [0]:
df_prod = df_prod.withColumn("processDate",current_timestamp())
display(df_prod)

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2025-06-30T15:18:44.952Z
2,Front Left,Toys,475.6,2025-06-30T15:18:44.952Z
3,May Likely,Clothing,367.34,2025-06-30T15:18:44.952Z
4,Gas Medical,Books,301.34,2025-06-30T15:18:44.952Z
5,No West,Books,82.23,2025-06-30T15:18:44.952Z
6,Form Rise,Electronics,82.22,2025-06-30T15:18:44.952Z
7,Today Want,Clothing,33.75,2025-06-30T15:18:44.952Z
8,Same All,Books,433.76,2025-06-30T15:18:44.952Z
9,Brother Because,Clothing,302.55,2025-06-30T15:18:44.952Z
10,Job Capital,Toys,355.5,2025-06-30T15:18:44.952Z


In [0]:
display(df_prod.groupBy("category").agg(avg("price").alias("avg_price")))


category,avg_price
Furniture,230.32222222222225
Clothing,199.8395
Toys,261.8695238095238
Books,271.8863636363636
Electronics,241.35571428571424


Databricks visualization. Run in Databricks to view.

In [0]:
if spark.catalog.tableExists("databricksansh.silver.products_enr"):

    dlt_obj = DeltaTable.forName(spark, "databricksansh.silver.products_enr")

    dlt_obj.alias("trg").merge(df_prod.alias("src"), "trg.product_id == src.product_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_prod.write.format("delta")\
            .mode("append")\
            .saveAsTable("databricksansh.silver.products_enr")

In [0]:
%sql
select * from databricksansh.silver.products_enr

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2025-06-30T15:23:55.864Z
2,Front Left,Toys,475.6,2025-06-30T15:23:55.864Z
3,May Likely,Clothing,367.34,2025-06-30T15:23:55.864Z
4,Gas Medical,Books,301.34,2025-06-30T15:23:55.864Z
5,No West,Books,82.23,2025-06-30T15:23:55.864Z
6,Form Rise,Electronics,82.22,2025-06-30T15:23:55.864Z
7,Today Want,Clothing,33.75,2025-06-30T15:23:55.864Z
8,Same All,Books,433.76,2025-06-30T15:23:55.864Z
9,Brother Because,Clothing,302.55,2025-06-30T15:23:55.864Z
10,Job Capital,Toys,355.5,2025-06-30T15:23:55.864Z


### **STORES**

In [0]:
df_str = spark.read.format("csv")\
              .option("inferSchema",True)\
              .option("header",True)\
              .load("/Volumes/databricksansh/bronze/bronze_volume/stores/")
display(df_str)

store_id,store_name,region
1,Store_1,North
2,Store_2,West
3,Store_3,West
4,Store_4,South
5,Store_5,East
6,Store_6,South
7,Store_7,South
8,Store_8,West
9,Store_9,South
10,Store_10,East


In [0]:
df_str = df_str.withColumn("store_name",regexp_replace(col("store_name"),"_",""))
display(df_str)

store_id,store_name,region
1,Store1,North
2,Store2,West
3,Store3,West
4,Store4,South
5,Store5,East
6,Store6,South
7,Store7,South
8,Store8,West
9,Store9,South
10,Store10,East


In [0]:
df_str = df_str.withColumn("processDate",current_timestamp())
display(df_str)


store_id,store_name,region,processDate
1,Store1,North,2025-06-30T15:44:38.211Z
2,Store2,West,2025-06-30T15:44:38.211Z
3,Store3,West,2025-06-30T15:44:38.211Z
4,Store4,South,2025-06-30T15:44:38.211Z
5,Store5,East,2025-06-30T15:44:38.211Z
6,Store6,South,2025-06-30T15:44:38.211Z
7,Store7,South,2025-06-30T15:44:38.211Z
8,Store8,West,2025-06-30T15:44:38.211Z
9,Store9,South,2025-06-30T15:44:38.211Z
10,Store10,East,2025-06-30T15:44:38.211Z


In [0]:
if spark.catalog.tableExists("databricksansh.silver.stores_enr"):

    dlt_obj = DeltaTable.forName(spark, "databricksansh.silver.stores_enr")

    dlt_obj.alias("trg").merge(df_str.alias("src"), "trg.store_id == src.store_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_str.write.format("delta")\
            .mode("append")\
            .saveAsTable("databricksansh.silver.stores_enr")

### **SALES**

In [0]:
df_sales = spark.read.format("csv")\
              .option("inferSchema",True)\
              .option("header",True)\
              .load("/Volumes/databricksansh/bronze/bronze_volume/sales/")
display(df_sales)

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount
1,63,38,12,3,29.24,2024-09-21,113.25
2,96,97,14,4,1.57,2025-03-02,1038.44
3,52,23,14,4,20.47,2024-09-22,475.94
4,96,63,2,4,14.04,2024-08-22,1427.73
5,132,15,5,3,17.38,2024-11-11,235.47
6,151,97,5,3,2.56,2025-02-14,770.99
7,143,25,6,2,1.29,2025-06-01,455.55
8,171,17,13,1,20.01,2024-10-16,124.46
9,29,97,9,4,24.7,2025-03-28,794.42
10,36,66,15,4,23.46,2024-11-15,837.75


In [0]:
df_sales = df_sales.withColumn("pricePerSale",round(col("total_amount")/col("quantity"),2))
df_sales = df_sales.withColumn("processDate",current_timestamp())

display(df_sales)


sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,pricePerSale,processDate
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2025-06-30T15:49:24.801Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2025-06-30T15:49:24.801Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.99,2025-06-30T15:49:24.801Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.93,2025-06-30T15:49:24.801Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2025-06-30T15:49:24.801Z
6,151,97,5,3,2.56,2025-02-14,770.99,257.0,2025-06-30T15:49:24.801Z
7,143,25,6,2,1.29,2025-06-01,455.55,227.78,2025-06-30T15:49:24.801Z
8,171,17,13,1,20.01,2024-10-16,124.46,124.46,2025-06-30T15:49:24.801Z
9,29,97,9,4,24.7,2025-03-28,794.42,198.61,2025-06-30T15:49:24.801Z
10,36,66,15,4,23.46,2024-11-15,837.75,209.44,2025-06-30T15:49:24.801Z


In [0]:
if spark.catalog.tableExists("databricksansh.silver.sales_enr"):

    dlt_obj = DeltaTable.forName(spark, "databricksansh.silver.sales_enr")

    dlt_obj.alias("trg").merge(df_sales.alias("src"), "trg.sales_id == src.sales_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_sales.write.format("delta")\
            .mode("append")\
            .saveAsTable("databricksansh.silver.sales_enr")

### **SPARK SQL**

In [0]:
df = spark.sql("SELECT * FROM databricksansh.silver.products_enr")

In [0]:
display(df)

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2025-06-30T15:23:55.864Z
2,Front Left,Toys,475.6,2025-06-30T15:23:55.864Z
3,May Likely,Clothing,367.34,2025-06-30T15:23:55.864Z
4,Gas Medical,Books,301.34,2025-06-30T15:23:55.864Z
5,No West,Books,82.23,2025-06-30T15:23:55.864Z
6,Form Rise,Electronics,82.22,2025-06-30T15:23:55.864Z
7,Today Want,Clothing,33.75,2025-06-30T15:23:55.864Z
8,Same All,Books,433.76,2025-06-30T15:23:55.864Z
9,Brother Because,Clothing,302.55,2025-06-30T15:23:55.864Z
10,Job Capital,Toys,355.5,2025-06-30T15:23:55.864Z


In [0]:
df.createOrReplaceTempView("temp_products")

In [0]:
df = spark.sql("""
            SELECT *,
                    CASE 
                    WHEN category = 'Toys' THEN 'YES' ELSE 'NO' END AS flag
                    FROM temp_products
            """)

In [0]:
display(df)

product_id,product_name,category,price,processDate,flag
1,Society Rest,Toys,190.4,2025-06-30T15:23:55.864Z,YES
2,Front Left,Toys,475.6,2025-06-30T15:23:55.864Z,YES
3,May Likely,Clothing,367.34,2025-06-30T15:23:55.864Z,NO
4,Gas Medical,Books,301.34,2025-06-30T15:23:55.864Z,NO
5,No West,Books,82.23,2025-06-30T15:23:55.864Z,NO
6,Form Rise,Electronics,82.22,2025-06-30T15:23:55.864Z,NO
7,Today Want,Clothing,33.75,2025-06-30T15:23:55.864Z,NO
8,Same All,Books,433.76,2025-06-30T15:23:55.864Z,NO
9,Brother Because,Clothing,302.55,2025-06-30T15:23:55.864Z,NO
10,Job Capital,Toys,355.5,2025-06-30T15:23:55.864Z,YES


### **PySpark UDF**

In [0]:
def greet(p_input):
  return "Hello"+str(p_input)

In [0]:
udf_greet = udf(greet)

In [0]:
df = df.withColumn("greet",udf_greet(col("flag")))
display(df)

product_id,product_name,category,price,processDate,flag,greet
1,Society Rest,Toys,190.4,2025-06-30T15:23:55.864Z,YES,HelloYES
2,Front Left,Toys,475.6,2025-06-30T15:23:55.864Z,YES,HelloYES
3,May Likely,Clothing,367.34,2025-06-30T15:23:55.864Z,NO,HelloNO
4,Gas Medical,Books,301.34,2025-06-30T15:23:55.864Z,NO,HelloNO
5,No West,Books,82.23,2025-06-30T15:23:55.864Z,NO,HelloNO
6,Form Rise,Electronics,82.22,2025-06-30T15:23:55.864Z,NO,HelloNO
7,Today Want,Clothing,33.75,2025-06-30T15:23:55.864Z,NO,HelloNO
8,Same All,Books,433.76,2025-06-30T15:23:55.864Z,NO,HelloNO
9,Brother Because,Clothing,302.55,2025-06-30T15:23:55.864Z,NO,HelloNO
10,Job Capital,Toys,355.5,2025-06-30T15:23:55.864Z,YES,HelloYES
