###Pyspark

#Customer

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
df= spark.read.format("csv")\
  .option("inferSchema",True)\
  .option("header",True)\
  .load("/Volumes/databricksshashi/bronze/bronze_volume_manual/customer/")

df.show(10)

+-----------+--------------------+--------------------+------------------+-----------+
|customer_id|                name|               email|          location|signup_date|
+-----------+--------------------+--------------------+------------------+-----------+
|          1|       Madison Smith|williamsteresa@ca...|        New Darryl| 2024-04-10|
|          2|          Kyle Brown|schroedermarco@ho...|      Lake Anthony| 2023-05-26|
|          3|  Alexander Gonzalez|  fgriffin@yahoo.com|       Port Joseph| 2022-09-09|
|          4|       Joseph Potter|paulsweeney@hotma...|        Amandafort| 2024-03-24|
|          5|         Joshua King|riverssheryl@geor...|     Port Paulbury| 2023-06-14|
|          6|        Spencer Pugh|jacob61@singh-joh...|         Duranfort| 2024-03-03|
|          7|      Kathryn Gordon|brookstina@yahoo.com|      Lake Michael| 2022-08-30|
|          8|         Greg Weaver|davidlewis@gmail.com|        New Joseph| 2024-04-06|
|          9|Elizabeth Villanueva| richard8

In [0]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- location: string (nullable = true)
 |-- signup_date: date (nullable = true)



###By default csv dont have schema, default data type is string

In [0]:
df_new =spark.read.format("csv")\
  .option("inferSchema",True)\
  .option("header",True)\
  .load("/Volumes/databricksshashi/bronze/bronze_volume_manual/customer/")

###UPPER COLUMN with withColumn

In [0]:
df_column_upper = df_new.withColumn("name", upper(col("name")))
display(df_column_upper.limit(5))

customer_id,name,email,location,signup_date
1,MADISON SMITH,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10
2,KYLE BROWN,schroedermarco@hotmail.com,Lake Anthony,2023-05-26
3,ALEXANDER GONZALEZ,fgriffin@yahoo.com,Port Joseph,2022-09-09
4,JOSEPH POTTER,paulsweeney@hotmail.com,Amandafort,2024-03-24
5,JOSHUA KING,riverssheryl@george.com,Port Paulbury,2023-06-14


##SPLIT function

In [0]:
df_split = df_new.withColumn("domain",split(col("email"),"@"))
display(df_split.limit(5))

customer_id,name,email,location,signup_date,domain
1,Madison Smith,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,"List(williamsteresa, calhoun-fisher.com)"
2,Kyle Brown,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,"List(schroedermarco, hotmail.com)"
3,Alexander Gonzalez,fgriffin@yahoo.com,Port Joseph,2022-09-09,"List(fgriffin, yahoo.com)"
4,Joseph Potter,paulsweeney@hotmail.com,Amandafort,2024-03-24,"List(paulsweeney, hotmail.com)"
5,Joshua King,riverssheryl@george.com,Port Paulbury,2023-06-14,"List(riverssheryl, george.com)"


In [0]:
df_split = df_new.withColumn("domain",split(col("email"),"@")[1])
display(df_split.limit(5))

customer_id,name,email,location,signup_date,domain
1,Madison Smith,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,calhoun-fisher.com
2,Kyle Brown,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,hotmail.com
3,Alexander Gonzalez,fgriffin@yahoo.com,Port Joseph,2022-09-09,yahoo.com
4,Joseph Potter,paulsweeney@hotmail.com,Amandafort,2024-03-24,hotmail.com
5,Joshua King,riverssheryl@george.com,Port Paulbury,2023-06-14,george.com



###group by domain 

In [0]:
from pyspark.sql.functions import split, col, count

df_with_domain = df_new.withColumn("domain", split(col("email"), "@")[1])
df_grouped = df_with_domain.groupBy("domain").agg(count(col("customer_id")).alias("total_customer"))
display(df_grouped.sort(col("total_customer").desc()))

domain,total_customer
yahoo.com,38
hotmail.com,32
gmail.com,29
williams.com,3
adams.com,2
gordon-vaughn.com,1
salas.com,1
padilla.com,1
wells.com,1
smith-wright.biz,1


Databricks visualization. Run in Databricks to view.

##apply filter and query


In [0]:
from pyspark.sql.functions import current_timestamp
df_new2 = df_new.withColumn("processed_date", current_timestamp())
display(df_new2.limit(5))

customer_id,name,email,location,signup_date,processed_date
1,Madison Smith,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10,2026-02-24T18:25:03.446Z
2,Kyle Brown,schroedermarco@hotmail.com,Lake Anthony,2023-05-26,2026-02-24T18:25:03.446Z
3,Alexander Gonzalez,fgriffin@yahoo.com,Port Joseph,2022-09-09,2026-02-24T18:25:03.446Z
4,Joseph Potter,paulsweeney@hotmail.com,Amandafort,2024-03-24,2026-02-24T18:25:03.446Z
5,Joshua King,riverssheryl@george.com,Port Paulbury,2023-06-14,2026-02-24T18:25:03.446Z


##write data in Silver from bronze

#append mode - source - destination, there is data already in destination, new data will come and add

#overwrite mode - source - destination, there is data already in destination, new data will replce


#error mode - source - destination, there is data already in destination, new data will come, and give error


#ignore mode - source - destination, there is data already in destination, new data will come, dont do anything

In [0]:
from delta.tables import DeltaTable



##upsert data from bronze to silver

###very important logic

In [0]:

if spark.catalog.tableExists("databricksshashi.silver.customer_enr"):
  dlt_obj = DeltaTable.forName(spark, "databricksshashi.silver.customer_enr")
  dlt_obj.alias("trg").merge(
    df_new.alias("src"),
    "trg.customer_id = src.customer_id")\
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
        .execute()

else:
  df_new.write.format("delta")\
    .mode("append")\
      .saveAsTable("databricksshashi.silver.customer_enr")

 

In [0]:
%sql
select * from databricksshashi.silver.customer_enr;

customer_id,name,email,location,signup_date
1,Madison Smith,williamsteresa@calhoun-fisher.com,New Darryl,2024-04-10
2,Kyle Brown,schroedermarco@hotmail.com,Lake Anthony,2023-05-26
3,Alexander Gonzalez,fgriffin@yahoo.com,Port Joseph,2022-09-09
4,Joseph Potter,paulsweeney@hotmail.com,Amandafort,2024-03-24
5,Joshua King,riverssheryl@george.com,Port Paulbury,2023-06-14
6,Spencer Pugh,jacob61@singh-johnson.net,Duranfort,2024-03-03
7,Kathryn Gordon,brookstina@yahoo.com,Lake Michael,2022-08-30
8,Greg Weaver,davidlewis@gmail.com,New Joseph,2024-04-06
9,Elizabeth Villanueva,richard84@gmail.com,West Ashleychester,2025-04-08
10,Darlene Morris,gregory16@gmail.com,Darrenmouth,2023-06-06


##PRODUCT

In [0]:
df_prod = spark.read.csv("/Volumes/databricksshashi/bronze/bronze_volume_manual/product/", header=True,inferSchema= True)

In [0]:
display(df_prod.limit(5))

product_id,product_name,category,price
1,Society Rest,Toys,190.4
2,Front Left,Toys,475.6
3,May Likely,Clothing,367.34
4,Gas Medical,Books,301.34
5,No West,Books,82.23


In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
df_prod= df_prod.withColumn("processDate",current_timestamp())
display(df_prod.limit(5))

product_id,product_name,category,price,processDate
1,Society Rest,Toys,190.4,2026-02-25T05:35:48.084Z
2,Front Left,Toys,475.6,2026-02-25T05:35:48.084Z
3,May Likely,Clothing,367.34,2026-02-25T05:35:48.084Z
4,Gas Medical,Books,301.34,2026-02-25T05:35:48.084Z
5,No West,Books,82.23,2026-02-25T05:35:48.084Z


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import avg

df_prod.groupBy("category").agg(avg("price").alias("avg_price")).display()

category,avg_price
Books,271.8863636363636
Toys,261.8695238095238
Electronics,241.35571428571424
Clothing,199.8395
Furniture,230.32222222222225


In [0]:
from delta.tables import DeltaTable
if spark.catalog.tableExists("databricksshashi.silver.product_enr"):
  dlt_obj = DeltaTable.forName(spark, "databricksshashi.silver.product_enr")
  dlt_obj.alias("trg").merge(
    df_prod.alias("src"),
    "trg.product_id = src.product_id")\
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
        .execute()

else:
  df_prod.write.format("delta")\
    .mode("append")\
      .saveAsTable("databricksshashi.silver.product_enr")

 

In [0]:
%sql
select * from databricksshashi.silver.product_enr;

product_id,product_name,category,price
1,Society Rest,Toys,190.4
2,Front Left,Toys,475.6
3,May Likely,Clothing,367.34
4,Gas Medical,Books,301.34
5,No West,Books,82.23
6,Form Rise,Electronics,82.22
7,Today Want,Clothing,33.75
8,Same All,Books,433.76
9,Brother Because,Clothing,302.55
10,Job Capital,Toys,355.5


###STORE

In [0]:
df_store= spark.read.csv("/Volumes/databricksshashi/bronze/bronze_volume_manual/store/", header=True,inferSchema= True)
display(df_store.limit(5))

store_id,store_name,region
1,Store_1,North
2,Store_2,West
3,Store_3,West
4,Store_4,South
5,Store_5,East


##remove _ from store name

In [0]:
from pyspark.sql.functions import regexp_replace, col

df_store = df_store.withColumn("store_name",regexp_replace(col("store_name"),"_",""))
display(df_store.limit(5))

store_id,store_name,region
1,Store1,North
2,Store2,West
3,Store3,West
4,Store4,South
5,Store5,East


In [0]:
from pyspark.sql.functions import current_timestamp
df_store= df_store.withColumn("processDate",current_timestamp())
display(df_store.limit(5))


store_id,store_name,region,processDate
1,Store1,North,2026-02-25T17:38:21.227Z
2,Store2,West,2026-02-25T17:38:21.227Z
3,Store3,West,2026-02-25T17:38:21.227Z
4,Store4,South,2026-02-25T17:38:21.227Z
5,Store5,East,2026-02-25T17:38:21.227Z


In [0]:
if spark.catalog.tableExists("databricksshashi.silver.store_enr"):
  dlt_obj = DeltaTable.forName(spark, "databricksshashi.silver.store_enr")
  dlt_obj.alias("trg").merge(
    df_store.alias("src"),
    "trg.store_id = src.store_id")\
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
        .execute()

else:
  df_store.write.format("delta")\
    .mode("append")\
      .saveAsTable("databricksshashi.silver.store_enr")


In [0]:
%sql
select * from databricksshashi.silver.store_enr;

store_id,store_name,region,processDate
1,Store1,North,2026-02-25T17:42:49.534Z
2,Store2,West,2026-02-25T17:42:49.534Z
3,Store3,West,2026-02-25T17:42:49.534Z
4,Store4,South,2026-02-25T17:42:49.534Z
5,Store5,East,2026-02-25T17:42:49.534Z
6,Store6,South,2026-02-25T17:42:49.534Z
7,Store7,South,2026-02-25T17:42:49.534Z
8,Store8,West,2026-02-25T17:42:49.534Z
9,Store9,South,2026-02-25T17:42:49.534Z
10,Store10,East,2026-02-25T17:42:49.534Z


###SALES 

In [0]:
df_sales= spark.read.csv("/Volumes/databricksshashi/bronze/bronze_volume_manual/sales/", header=True,inferSchema= True)
display(df_sales.limit(5))

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount
1,63,38,12,3,29.24,2024-09-21,113.25
2,96,97,14,4,1.57,2025-03-02,1038.44
3,52,23,14,4,20.47,2024-09-22,475.94
4,96,63,2,4,14.04,2024-08-22,1427.73
5,132,15,5,3,17.38,2024-11-11,235.47


In [0]:
df_sales = df_sales.withColumn("pricePerSale",col("total_amount")/col("quantity"))
df_sales = df_sales.withColumn("processDate",current_timestamp())
display(df_sales.limit(5))

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,pricePerSale,processDate
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2026-02-25T18:25:01.620Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2026-02-25T18:25:01.620Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.985,2026-02-25T18:25:01.620Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.9325,2026-02-25T18:25:01.620Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2026-02-25T18:25:01.620Z


In [0]:
%sql
drop table  databricksshashi.silver.sales_enr;

In [0]:
if spark.catalog.tableExists("databricksshashi.silver.sales_enr"):

    dlt_obj = DeltaTable.forName(spark, "databricksshashi.silver.sales_enr")

    dlt_obj.alias("trg").merge(df_sales.alias("src"), "trg.sales_id == src.sales_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_sales.write.format("delta")\
            .mode("append")\
            .saveAsTable("databricksansh.silver.sales_enr")

In [0]:
%sql
select * from databricksshashi.silver.sales_enr;

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,pricePerSale,processDate
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2026-02-25T18:44:08.084Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2026-02-25T18:44:08.084Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.985,2026-02-25T18:44:08.084Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.9325,2026-02-25T18:44:08.084Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2026-02-25T18:44:08.084Z
6,151,97,5,3,2.56,2025-02-14,770.99,256.99666666666667,2026-02-25T18:44:08.084Z
7,143,25,6,2,1.29,2025-06-01,455.55,227.775,2026-02-25T18:44:08.084Z
8,171,17,13,1,20.01,2024-10-16,124.46,124.46,2026-02-25T18:44:08.084Z
9,29,97,9,4,24.7,2025-03-28,794.42,198.605,2026-02-25T18:44:08.084Z
10,36,66,15,4,23.46,2024-11-15,837.75,209.4375,2026-02-25T18:44:08.084Z


##SPARK SQL

In [0]:
spark.sql("select * from databricksshashi.silver.sales_enr")

DataFrame[sales_id: int, customer_id: int, product_id: int, store_id: int, quantity: int, discount: double, date: date, total_amount: double, pricePerSale: double, processDate: timestamp]

In [0]:
display(spark.sql("select * from databricksshashi.silver.sales_enr"))

sales_id,customer_id,product_id,store_id,quantity,discount,date,total_amount,pricePerSale,processDate
1,63,38,12,3,29.24,2024-09-21,113.25,37.75,2026-02-25T18:44:08.084Z
2,96,97,14,4,1.57,2025-03-02,1038.44,259.61,2026-02-25T18:44:08.084Z
3,52,23,14,4,20.47,2024-09-22,475.94,118.985,2026-02-25T18:44:08.084Z
4,96,63,2,4,14.04,2024-08-22,1427.73,356.9325,2026-02-25T18:44:08.084Z
5,132,15,5,3,17.38,2024-11-11,235.47,78.49,2026-02-25T18:44:08.084Z
6,151,97,5,3,2.56,2025-02-14,770.99,256.99666666666667,2026-02-25T18:44:08.084Z
7,143,25,6,2,1.29,2025-06-01,455.55,227.775,2026-02-25T18:44:08.084Z
8,171,17,13,1,20.01,2024-10-16,124.46,124.46,2026-02-25T18:44:08.084Z
9,29,97,9,4,24.7,2025-03-28,794.42,198.605,2026-02-25T18:44:08.084Z
10,36,66,15,4,23.46,2024-11-15,837.75,209.4375,2026-02-25T18:44:08.084Z


In [0]:
display(spark.sql("select * from databricksshashi.silver.product_enr"))

product_id,product_name,category,price
1,Society Rest,Toys,190.4
2,Front Left,Toys,475.6
3,May Likely,Clothing,367.34
4,Gas Medical,Books,301.34
5,No West,Books,82.23
6,Form Rise,Electronics,82.22
7,Today Want,Clothing,33.75
8,Same All,Books,433.76
9,Brother Because,Clothing,302.55
10,Job Capital,Toys,355.5


In [0]:

spark.sql("select upper(category) from databricksshashi.silver.product_enr")
    


DataFrame[upper(category): string]

In [0]:
df_sqlspark=spark.sql("select upper(category) from databricksshashi.silver.product_enr")
display(df_sqlspark.limit(5))

upper(category)
TOYS
TOYS
CLOTHING
BOOKS
BOOKS


##Temp View - want to use sql on top of dataframe, within session in one notebook

In [0]:
df_sqlspark.createOrReplaceTempView("temp_product")