# Demo of Transforming data in Pyspark 
## Silver Layer of Medallion Architecture

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window


### Load Data

In [0]:


path = 'dbfs:/FileStore/sample_data/dirty_cafe_sales.csv' #Load File Path from FIle Explorer
     

cafe_df = (spark.read 
    .format('csv')                          # Format of File Loaded into Databricks
    .option('header','true')                # Use first line as header
    .option('inferschema',"true")           # Infer data types 
    .load(path))                            # Path to source file
     



### Data Exploration

In [0]:
print(f"Total Rows: {cafe_df.count()}") # Shows the count similar to pandas.info()
cafe_df.printSchema() # Show fields, data types, and nullable status
cafe_df.agg(*[count(c).alias(c) for c in cafe_df.columns]).show() # Count and print non null values


Total Rows: 10000
root
 |-- Transaction ID: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price Per Unit: string (nullable = true)
 |-- Total Spent: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: string (nullable = true)

+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|Transaction ID|Item|Quantity|Price Per Unit|Total Spent|Payment Method|Location|Transaction Date|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|         10000|9667|    9862|          9821|       9827|          7421|    6735|            9841|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+



In [0]:
cafe_df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in cafe_df.columns]).show() #Null Ccount. Inverse of previous summary count of non-nulls

+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|Transaction ID|Item|Quantity|Price Per Unit|Total Spent|Payment Method|Location|Transaction Date|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|             0| 333|     138|           179|        173|          2579|    3265|             159|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+



In [0]:
cafe_df.describe().show() # Summary statistics 

+-------+--------------+-------+-----------------+------------------+-----------------+--------------+--------+----------------+
|summary|Transaction ID|   Item|         Quantity|    Price Per Unit|      Total Spent|Payment Method|Location|Transaction Date|
+-------+--------------+-------+-----------------+------------------+-----------------+--------------+--------+----------------+
|  count|         10000|   9667|             9862|              9821|             9827|          7421|    6735|            9841|
|   mean|          NULL|   NULL|3.028463396702027| 2.949984155487483|8.924352495262161|          NULL|    NULL|            NULL|
| stddev|          NULL|   NULL|1.419006873221319|1.2784504728035881|6.009919472829945|          NULL|    NULL|            NULL|
|    min|   TXN_1000555|   Cake|                1|               1.0|              1.0|          Cash|   ERROR|      2023-01-01|
|    max|   TXN_9999124|UNKNOWN|          UNKNOWN|           UNKNOWN|          UNKNOWN|       UNK

In [0]:
cafe_df.count() # Total Number of Rows

10000

In [0]:
print(f"Total Rows: {cafe_df.count()}")
cafe_df.printSchema()


Total Rows: 10000
root
 |-- Transaction ID: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price Per Unit: string (nullable = true)
 |-- Total Spent: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: string (nullable = true)



In [0]:

cafe_df.agg(*[countDistinct(c).alias(c) for c in cafe_df.columns]).show() # Count number of unique values. Can use approx_count_distinct on larger datasets


+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|Transaction ID|Item|Quantity|Price Per Unit|Total Spent|Payment Method|Location|Transaction Date|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|         10000|  10|       7|             8|         19|             5|       4|             367|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+



In [0]:
#display(cafe_df.agg(*[collect_set(c).alias(c) for c in cafe_df.columns]))  Use Display so you can see the full lis
#unique_values.show(truncate=False)  # Prevents cutting off long lists
cafe_df.agg(*[collect_set(c).alias(c) for c in cafe_df.columns]).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      Transaction ID|                Item|            Quantity|      Price Per Unit|         Total Spent|      Payment Method|            Location|    Transaction Date|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|[TXN_4625910, TXN...|[ERROR, Coffee, U...|[ERROR, 3, 1, UNK...|[ERROR, 5.0, UNKN...|[9.0, 10.0, 16.0,...|[ERROR, Credit Ca...|[ERROR, UNKNOWN, ...|[2023-02-18, 2023...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+



In [0]:
display(cafe_df.head(10))


Transaction ID,Item,Quantity,Price Per Unit,Total Spent,Payment Method,Location,Transaction Date
TXN_1961373,Coffee,2,2.0,4.0,Credit Card,Takeaway,2023-09-08
TXN_4977031,Cake,4,3.0,12.0,Cash,In-store,2023-05-16
TXN_4271903,Cookie,4,1.0,ERROR,Credit Card,In-store,2023-07-19
TXN_7034554,Salad,2,5.0,10.0,UNKNOWN,UNKNOWN,2023-04-27
TXN_3160411,Coffee,2,2.0,4.0,Digital Wallet,In-store,2023-06-11
TXN_2602893,Smoothie,5,4.0,20.0,Credit Card,,2023-03-31
TXN_4433211,UNKNOWN,3,3.0,9.0,ERROR,Takeaway,2023-10-06
TXN_6699534,Sandwich,4,4.0,16.0,Cash,UNKNOWN,2023-10-28
TXN_4717867,,5,3.0,15.0,,Takeaway,2023-07-28
TXN_2064365,Sandwich,5,4.0,20.0,,In-store,2023-12-31


### Data Transformation

#### Fix Corrupt Data

In [0]:
error_values = ['ERROR', "UNKNOWN"]

cafe_df = cafe_df \
            .select(
                    [when(col(c).isin(error_values), None).otherwise(col(c)).alias(c) for c in cafe_df.columns]  # Standardize missing values so everything is a null value
                    )

In [0]:
display(cafe_df.head(10))

Transaction ID,Item,Quantity,Price Per Unit,Total Spent,Payment Method,Location,Transaction Date
TXN_1961373,Coffee,2,2.0,4.0,Credit Card,Takeaway,2023-09-08
TXN_4977031,Cake,4,3.0,12.0,Cash,In-store,2023-05-16
TXN_4271903,Cookie,4,1.0,,Credit Card,In-store,2023-07-19
TXN_7034554,Salad,2,5.0,10.0,,,2023-04-27
TXN_3160411,Coffee,2,2.0,4.0,Digital Wallet,In-store,2023-06-11
TXN_2602893,Smoothie,5,4.0,20.0,Credit Card,,2023-03-31
TXN_4433211,,3,3.0,9.0,,Takeaway,2023-10-06
TXN_6699534,Sandwich,4,4.0,16.0,Cash,,2023-10-28
TXN_4717867,,5,3.0,15.0,,Takeaway,2023-07-28
TXN_2064365,Sandwich,5,4.0,20.0,,In-store,2023-12-31


In [0]:
cafe_df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in cafe_df.columns]).show() # Recount of Null values now that errors and unknowns are null values

+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|Transaction ID|Item|Quantity|Price Per Unit|Total Spent|Payment Method|Location|Transaction Date|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+
|             0| 969|     479|           533|        502|          3178|    3961|             460|
+--------------+----+--------+--------------+-----------+--------------+--------+----------------+



In [0]:
cafe_df.select('Item').distinct().show()

+--------+
|    Item|
+--------+
|   Salad|
|     Tea|
|    NULL|
|Sandwich|
|   Juice|
|Smoothie|
|  Coffee|
|    Cake|
|  Cookie|
+--------+



In [0]:
cafe_df.select('Price Per Unit').distinct().show()

+--------------+
|Price Per Unit|
+--------------+
|           1.0|
|          NULL|
|           5.0|
|           4.0|
|           1.5|
|           2.0|
|           3.0|
+--------------+



In [0]:
cafe_df.select('Item','Price Per Unit').distinct().show(n=100) # This allows us to deduce what the null values should be in some instances

+--------+--------------+
|    Item|Price Per Unit|
+--------+--------------+
|    NULL|           1.0|
|   Salad|          NULL|
|     Tea|          NULL|
|Sandwich|           4.0|
|    NULL|          NULL|
|Sandwich|          NULL|
|   Juice|           3.0|
|    NULL|           5.0|
|   Juice|          NULL|
|    NULL|           4.0|
|     Tea|           1.5|
|Smoothie|          NULL|
|  Coffee|          NULL|
|    Cake|           3.0|
|    NULL|           1.5|
|    NULL|           2.0|
|    Cake|          NULL|
|Smoothie|           4.0|
|  Cookie|          NULL|
|    NULL|           3.0|
|  Cookie|           1.0|
|  Coffee|           2.0|
|   Salad|           5.0|
+--------+--------------+



In [0]:


cafe_df = cafe_df.withColumn(
    "Price Per Unit",  
    when(col("Item") == "Sandwich", 4.0)
    .when(col("Item") == "Tea", 1.5)
    .when(col("Item") == "Juice", 3.0)
    .when(col("Item") == "Cake", 3.0)
    .when(col("Item") == "Cookie", 1.0)
    .when(col("Item") == "Salad", 5.0)
    .when(col("Item") == "Smoothie", 4.0)
    .when(col("Item") == "Coffee", 2.0)
    .otherwise(col("Price Per Unit"))  
)

cafe_df.select('Item','Price Per Unit').distinct().show()


+--------+--------------+
|    Item|Price Per Unit|
+--------+--------------+
|    NULL|           1.0|
|Sandwich|           4.0|
|    NULL|          NULL|
|   Juice|           3.0|
|    NULL|           5.0|
|    NULL|           4.0|
|     Tea|           1.5|
|    Cake|           3.0|
|    NULL|           1.5|
|    NULL|           2.0|
|Smoothie|           4.0|
|    NULL|           3.0|
|  Cookie|           1.0|
|  Coffee|           2.0|
|   Salad|           5.0|
+--------+--------------+



In [0]:
cafe_df.dtypes

[('Transaction ID', 'string'),
 ('Item', 'string'),
 ('Quantity', 'string'),
 ('Price Per Unit', 'string'),
 ('Total Spent', 'string'),
 ('Payment Method', 'string'),
 ('Location', 'string'),
 ('Transaction Date', 'string')]

In [0]:
from pyspark.sql.functions import when, col

cafe_df = cafe_df.withColumn(
    "Item",  
    when(col("Price Per Unit") == 1.5, "Tea")
    .when(col("Price Per Unit") == 1.0, "Cookie")
    .when(col("Price Per Unit") == 2.0, "Coffee")
    .when(col("Price Per Unit") == 5.0, "Salad")
    .when((col("Price Per Unit") == 3.0) & (col("Item").isNull()), "Juice or Cake")
    .when((col("Price Per Unit") == 4.0) & (col("Item").isNull()), "Smoothie or Sandwitch")  
    .otherwise(col("Item"))
)

cafe_df.select("Item", "Price Per Unit").distinct().show()


+--------------------+--------------+
|                Item|Price Per Unit|
+--------------------+--------------+
|            Sandwich|           4.0|
|                NULL|          NULL|
|               Juice|           3.0|
|                 Tea|           1.5|
|       Juice or Cake|           3.0|
|                Cake|           3.0|
|Smoothie or Sandw...|           4.0|
|            Smoothie|           4.0|
|              Cookie|           1.0|
|              Coffee|           2.0|
|               Salad|           5.0|
+--------------------+--------------+



In [0]:
cafe_df = cafe_df.filter(col("Item").isNotNull())

In [0]:
cafe_df.select("Item", "Price Per Unit").distinct().show()

+--------------------+--------------+
|                Item|Price Per Unit|
+--------------------+--------------+
|            Sandwich|           4.0|
|               Juice|           3.0|
|                 Tea|           1.5|
|       Juice or Cake|           3.0|
|                Cake|           3.0|
|Smoothie or Sandw...|           4.0|
|            Smoothie|           4.0|
|              Cookie|           1.0|
|              Coffee|           2.0|
|               Salad|           5.0|
+--------------------+--------------+



In [0]:
cafe_df.select("Payment Method").distinct().show()

+--------------+
|Payment Method|
+--------------+
|   Credit Card|
|          NULL|
|Digital Wallet|
|          Cash|
+--------------+



In [0]:
cafe_df.select("Location").distinct().show()

+--------+
|Location|
+--------+
|In-store|
|    NULL|
|Takeaway|
+--------+



In [0]:
cafe_df = cafe_df.fillna({"Payment Method":"Unknown", "Location":"Unknown"})

In [0]:
display(cafe_df.head(10))

Transaction ID,Item,Quantity,Price Per Unit,Total Spent,Payment Method,Location,Transaction Date
TXN_1961373,Coffee,2,2.0,4.0,Credit Card,Takeaway,2023-09-08
TXN_4977031,Cake,4,3.0,12.0,Cash,In-store,2023-05-16
TXN_4271903,Cookie,4,1.0,,Credit Card,In-store,2023-07-19
TXN_7034554,Salad,2,5.0,10.0,Unknown,Unknown,2023-04-27
TXN_3160411,Coffee,2,2.0,4.0,Digital Wallet,In-store,2023-06-11
TXN_2602893,Smoothie,5,4.0,20.0,Credit Card,Unknown,2023-03-31
TXN_4433211,Juice or Cake,3,3.0,9.0,Unknown,Takeaway,2023-10-06
TXN_6699534,Sandwich,4,4.0,16.0,Cash,Unknown,2023-10-28
TXN_4717867,Juice or Cake,5,3.0,15.0,Unknown,Takeaway,2023-07-28
TXN_2064365,Sandwich,5,4.0,20.0,Unknown,In-store,2023-12-31


In [0]:
cafe_df = cafe_df.withColumn(
    "Total Spent",
    when(col("Total Spent").isNull(), col("Quantity") * col("Price Per Unit"))  #Fill missing values in the Total Spent Column. More efficient than recalculating.
    .otherwise(col("Total Spent"))
)

display(cafe_df.head(10))


Transaction ID,Item,Quantity,Price Per Unit,Total Spent,Payment Method,Location,Transaction Date
TXN_1961373,Coffee,2,2.0,4.0,Credit Card,Takeaway,2023-09-08
TXN_4977031,Cake,4,3.0,12.0,Cash,In-store,2023-05-16
TXN_4271903,Cookie,4,1.0,4.0,Credit Card,In-store,2023-07-19
TXN_7034554,Salad,2,5.0,10.0,Unknown,Unknown,2023-04-27
TXN_3160411,Coffee,2,2.0,4.0,Digital Wallet,In-store,2023-06-11
TXN_2602893,Smoothie,5,4.0,20.0,Credit Card,Unknown,2023-03-31
TXN_4433211,Juice or Cake,3,3.0,9.0,Unknown,Takeaway,2023-10-06
TXN_6699534,Sandwich,4,4.0,16.0,Cash,Unknown,2023-10-28
TXN_4717867,Juice or Cake,5,3.0,15.0,Unknown,Takeaway,2023-07-28
TXN_2064365,Sandwich,5,4.0,20.0,Unknown,In-store,2023-12-31


### Fix Data Types

In [0]:
cafe_df = cafe_df \
    .withColumn("Price Per Unit", col("Price Per Unit").cast("double")) \
    .withColumn("Total Spent", col("Total Spent").cast("double")) \
        .withColumn("Quantity", col("Quantity").cast("double")) \
    .withColumn("Transaction Date", to_date(col("Transaction Date"), "yyyy-MM-dd"))

cafe_df.dtypes

[('Transaction ID', 'string'),
 ('Item', 'string'),
 ('Quantity', 'double'),
 ('Price Per Unit', 'double'),
 ('Total Spent', 'double'),
 ('Payment Method', 'string'),
 ('Location', 'string'),
 ('Transaction Date', 'date')]


### Format String Fields

In [0]:
cafe_df = cafe_df.withColumn('Item', trim(col('Item'))) \
    .withColumn('Payment Method', trim(col('Payment Method'))) \
    .withColumn('Location', trim(col('Location')))


### Load to Silver Layer

In [0]:


cafe_df.createOrReplaceTempView("cafe_temp_view")
     

'''

catalog = "silver_prod"
schema = "silver_db"
table_name = "cafe_silver"


role_table = f"{catalog}.{schema}.{table_name}"


spark.sql("SELECT * FROM cafe_temp_view").write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(role_table)

'''
     


'\n\ncatalog = "silver_prod"\nschema = "silver_db"\ntable_name = "cafe_silver"\n\n\nrole_table = f"{catalog}.{schema}.{table_name}"\n\n\nspark.sql("SELECT * FROM cafe_temp_view").write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(role_table)\n\n'