In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Extract

In [0]:
df_products_path = "/Volumes/zubale/challenge1/products/zubale-products.csv"
df_orders_path = "/Volumes/zubale/challenge1/orders/zubale-orders.csv"

In [0]:
# Schema for the products table.
products_schema= StructType([
StructField('id',StringType(),nullable=True ),
StructField('name',StringType(),nullable=True),
StructField('category',StringType(),nullable=True),
StructField('price',FloatType(),nullable=True)
])

In [0]:
# Schema for the orders table.
orders_schema= StructType([
StructField('id',StringType(),nullable=True ),
StructField('product_id',StringType(),nullable=True),
StructField('quantity',IntegerType(),nullable=True),
StructField('created_date',StringType(),nullable=True)
])

In [0]:
df_products = spark.read.schema(products_schema).csv(df_products_path, sep=",", header=True)
df_orders = spark.read.schema(orders_schema).csv(df_orders_path, sep=",", header=True)
#display(df_products)
#display(df_orders)

In [0]:
df_products.display()
df_orders.display()

id,name,category,price
1,Product_1,Pants,92.55
2,Product_2,Shirts,43.11
3,Product_3,Jackets,59.02
4,Product_4,Shoes,49.65
5,Product_5,Pants,44.59
6,Product_6,Shoes,16.27
7,Product_7,Shirts,99.96
8,Product_8,Shirts,46.77
9,Product_9,Pants,32.68
10,Product_10,Pants,88.24


id,product_id,quantity,created_date
1,11,1,'2024-12-01'
2,17,2,'2024-12-01'
3,19,1,'2024-12-01'
4,12,1,'2024-12-01'
5,11,2,'2024-12-01'
6,17,1,'2024-12-01'
7,5,5,'2024-12-01'
8,5,4,'2024-12-01'
9,5,2,'2024-12-02'
10,1,4,'2024-12-02'


In [0]:
bronze_path = 'zubale.bronze'

In [0]:
# Save to a table.
df_orders.write.mode('overwrite').format("delta").saveAsTable(f"{bronze_path}.zubaleOrders")
df_products.write.mode('overwrite').format("delta").saveAsTable(f"{bronze_path}.zubaleProducts")

# Transform

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [0]:
# Read the tables.
df_products = spark.table("zubale.bronze.zubaleProducts")
df_orders = spark.table("zubale.bronze.zubaleOrders")

In [0]:
#Left join for the products and orders tables.
# And leave only the columns for: order_id, created_date, product_name, quantity.
df_joined = df_orders.join(df_products, df_products.id == df_orders.product_id, "left").select(
    df_orders.id.alias("order_id"),
    df_orders.created_date.alias("order_created_date"),
    df_products.name.alias("product_name"),
    df_orders.quantity.alias("quantity"),
    (df_orders.quantity * df_products.price).alias("total_price")
)
df_joined.display()

order_id,order_created_date,product_name,quantity,total_price
1,'2024-12-01',Product_11,1,69.05999755859375
2,'2024-12-01',Product_17,2,197.0200042724609
3,'2024-12-01',Product_19,1,51.93999862670898
4,'2024-12-01',Product_12,1,50.9900016784668
5,'2024-12-01',Product_11,2,138.1199951171875
6,'2024-12-01',Product_17,1,98.51000213623048
7,'2024-12-01',Product_5,5,222.95000076293945
8,'2024-12-01',Product_5,4,178.36000061035156
9,'2024-12-02',Product_5,2,89.18000030517578
10,'2024-12-02',Product_1,4,370.2000122070313


In [0]:
from pyspark.sql.functions import regexp_replace

df_joined_cleaned = df_joined.withColumn("order_created_date", regexp_replace("order_created_date", "'", ""))
display(df_joined_cleaned)

order_id,order_created_date,product_name,quantity,total_price
1,2024-12-01,Product_11,1,69.05999755859375
2,2024-12-01,Product_17,2,197.0200042724609
3,2024-12-01,Product_19,1,51.93999862670898
4,2024-12-01,Product_12,1,50.9900016784668
5,2024-12-01,Product_11,2,138.1199951171875
6,2024-12-01,Product_17,1,98.51000213623048
7,2024-12-01,Product_5,5,222.95000076293945
8,2024-12-01,Product_5,4,178.36000061035156
9,2024-12-02,Product_5,2,89.18000030517578
10,2024-12-02,Product_1,4,370.2000122070313


In [0]:
from pyspark.sql.functions import to_date

df_joined_casted = df_joined_cleaned.withColumn("order_created_date", to_date("order_created_date", "yyyy-MM-dd"))
display(df_joined_casted)

order_id,order_created_date,product_name,quantity,total_price
1,2024-12-01,Product_11,1,69.05999755859375
2,2024-12-01,Product_17,2,197.0200042724609
3,2024-12-01,Product_19,1,51.93999862670898
4,2024-12-01,Product_12,1,50.9900016784668
5,2024-12-01,Product_11,2,138.1199951171875
6,2024-12-01,Product_17,1,98.51000213623048
7,2024-12-01,Product_5,5,222.95000076293945
8,2024-12-01,Product_5,4,178.36000061035156
9,2024-12-02,Product_5,2,89.18000030517578
10,2024-12-02,Product_1,4,370.2000122070313


In [0]:
volume_path = "/Volumes/zubale/challenge1/output/order_full_information.csv"
df_joined_casted.write.mode('overwrite').csv(volume_path, header=True)

In [0]:
silver_path = "zubale.silver"

In [0]:
df_joined_casted.write.mode('overwrite').saveAsTable(f"{silver_path}.zubaleOrdersDetail")