In [0]:
%run "./Task_1_Raw_Data_Ingestion"

In [0]:
%sql
USE CATALOG main;
USE SCHEMA ecommerce;

### Task 2 â€“ Silver Layer (Data Standardization)

Create clean, trusted datasets for modeling.
What You Need to Do:

- Read Parquet data from Bronze
- Enforce schema and data types
- Handle duplicates and null values
- Create managed Delta tables
- Do NOT create aggregations


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, LongType, DoubleType, TimestampType;
from pyspark.sql.functions import current_date, current_timestamp, lit, col, to_date;

In [0]:
 cust_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True)
 ])

 df_silver_cust = spark.read.option("header","true").schema(cust_schema).parquet("/Volumes/main/ecommerce/lakehouse_volumes/bronze_dataset/customer/")

In [0]:
# note : no duplicates value

df_silver_cust = df_silver_cust.dropDuplicates(["customer_id"])

df_silver_cust = df_silver_cust.withColumn("LOAD_DATE", current_date());

In [0]:
df_silver_cust = df_silver_cust.withColumn("effective_start_date",current_date())\
              .withColumn("effective_end_date",to_date(lit("9999-12-31"),"yyyy-MM-dd"))\
              .withColumn("is_Active",lit(True))


In [0]:
df_silver_cust.limit(10).display()

In [0]:
# note : count is same for all columns that means no null values in any of the columns
df_silver_cust.describe().display()

In [0]:

# note : The subset parameter tells Spark to only check for nulls in the customer_id column. If a row has a null value in a different column (e.g., email), it will not be dropped.

df_silver_cust = df_silver_cust \
                .dropDuplicates(["customer_id"]) \
                .dropna(subset=["customer_id"])

In [0]:
from pyspark.sql.functions import col

df_silver_cust.write.format("delta").mode("overwrite").saveAsTable("customer")

In [0]:
df_silver_cust.write.mode("overwrite").parquet("/Volumes/main/ecommerce/lakehouse_volumes/silver_dataset/customer/")

In [0]:
%sql
-- note : want to make column not null before enforcing PK to customer_id and is_active .
ALTER TABLE customer ALTER COLUMN customer_id SET NOT NULL;
ALTER TABLE customer ALTER COLUMN is_Active SET NOT NULL;

In [0]:
%sql
select count(1) from main.ecommerce.customer;

In [0]:
order_items_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id",StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("freight_value", DoubleType())
])

In [0]:
# This does not work in Serverless cluster - Databricks free edition

# Disable vectorized reader to allow Spark to handle the string-to-timestamp conversion

# spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")


In [0]:
# reading data from parquet file gives error ---"Expected Spark type integer, actual Parquet type BYTE_ARRAY. SQLSTATE: KD001" . This error can be solved by setting configuration of spark which is not available in databricks free editions. 

# This does not work in Serverless cluster - Databricks free edition

# Disable vectorized reader to allow Spark to handle the string-to-timestamp conversion
# spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")


#df_silver_order_itemss = spark.read.option("header","true").schema(order_items_schema).parquet("/Volumes/main/ecommerce/lakehouse_volumes/bronze_dataset/orders_items/")


Why this happens
CSV Write: When you read CSV without inferSchema, every column is treated as a string. 
Writing this to Parquet stores shipping_limit_date as a BYTE_ARRAY (string).

The Conflict: When you try to read that Parquet file and tell Spark it's a TimestampType, the high-speed vectorized reader sees a mismatch between the expected binary format for timestamps and the actual BYTE_ARRAY in the file, causing the KD001 error.

In [0]:
#df_silver_order_itemss = spark.read.option("header","true").schema(order_items_schema).option("mode","DROPMalformed").parquet("/Volumes/main/ecommerce/lakehouse_volumes/bronze_dataset/orders_items/")


In [0]:
# df_silver_order_itemss.show()

In [0]:
# note : by function changing the datatype of the columns
# note : add LOAD_DATE

df_silver_order_items = df_raw_orders_items.withColumn("shipping_limit_date", to_timestamp(col("shipping_limit_date"), "yyyy-MM-dd HH:mm:ss"))\
.withColumn("LOAD_DATE",current_date())

In [0]:
# note : by casting changing the datatype of the columns

df_silver_order_items = df_silver_order_items.withColumn("price", col("price").cast("decimal"))\
                                             .withColumn("freight_value", col("freight_value").cast("decimal"))


In [0]:
df_silver_order_items.printSchema()

In [0]:
df_silver_order_items = df_silver_order_items.dropDuplicates()

In [0]:
df_silver_order_items.write.format("delta").mode("overwrite").saveAsTable("order_items")

In [0]:
df_silver_order_items.write.mode("overwrite").parquet("/Volumes/main/ecommerce/lakehouse_volumes/silver_dataset/order_items/")

In [0]:
%sql
select count(1) from order_items

In [0]:
order_payments_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", DoubleType(), True)
])

df_silver_order_payments = spark.read.option("header", True).parquet("/Volumes/main/ecommerce/lakehouse_volumes/bronze_dataset/order_payments/")



Why cast is usually better than "Enforce"

- Parquet is Self-Describing: Parquet files store their own schemas. If you try to force a schema that contradicts the physical file (e.g., forcing a 100-character binary string to be an integer), Spark may still throw a SchemaColumnConvertException.

- Safety: Casting handles failures gracefully by returning null for values that cannot be converted, whereas forcing a schema can sometimes cause task failures during execution.

In [0]:
df_silver_order_payments = \
            df_silver_order_payments.withColumn("order_id", col("order_id").cast("string"))\
            .withColumn("payment_sequential", col("payment_sequential").cast("integer"))\
            .withColumn("payment_type", col("payment_type").cast("string"))\
            .withColumn("payment_installments", col("payment_installments").cast("integer"))\
            .withColumn("payment_value", col("payment_value").cast("double"))\
            .withColumn("LOAD_DATE",current_date())       

In [0]:
df_silver_order_payments.printSchema()

In [0]:
df_silver_order_payments.show(2)

In [0]:
df_silver_order_payments.describe().display()

In [0]:
df_silver_order_payments = df_silver_order_payments.dropDuplicates()

In [0]:
df_silver_order_payments.write.format("delta").mode("overwrite").saveAsTable("order_payments")

In [0]:
df_silver_order_payments.write.mode("overwrite").parquet("/Volumes/main/ecommerce/lakehouse_volumes/silver_dataset/order_payments/")

In [0]:
%sql
select count(1) from order_payments;

In [0]:
df_silver_orders = spark.read.option("header",True).parquet("/Volumes/main/ecommerce/lakehouse_volumes/bronze_dataset/orders/")

In [0]:
df_silver_orders = \
df_raw_orders.withColumn("order_id", col("order_id").cast("string"))\
        .withColumn("customer_id", col("customer_id").cast("string"))\
        .withColumn("order_status", col("order_status").cast("string"))\
        .withColumn("order_purchase_timestamp", col("order_purchase_timestamp").cast("timestamp"))\
        .withColumn("order_approved_at", col("order_approved_at").cast("timestamp"))\
        .withColumn("order_delivered_carrier_date", col("order_delivered_carrier_date").cast("timestamp"))\
        .withColumn("order_delivered_customer_date", col("order_delivered_customer_date").cast("timestamp"))\
        .withColumn("order_estimated_delivery_date", col("order_estimated_delivery_date").cast("timestamp"))\
        .withColumn("LAOD_DATE",current_date())
                

In [0]:
df_silver_orders.printSchema()

In [0]:
df_silver_orders.limit(2).display()

In [0]:
df_silver_orders.describe().display()

In [0]:
df_silver_orders = df_silver_orders.dropDuplicates()

In [0]:
df_silver_orders.write.mode("overwrite").format("delta").saveAsTable("orders")

In [0]:
df_silver_orders.write.mode("overwrite").parquet("/Volumes/main/ecommerce/lakehouse_volumes/silver_dataset/orders/")

In [0]:
df_silver_products = spark.read.option("header",True).parquet("/Volumes/main/ecommerce/lakehouse_volumes/bronze_dataset/products/")



In [0]:
df_silver_products = \
    df_silver_products.withColumn("product_id", col("product_id").cast("string"))\
    .withColumn("product_category_name", col("product_category_name").cast("string"))\
    .withColumn("product_name_lenght", col("product_name_lenght").cast("integer"))\
    .withColumn("product_description_lenght", col("product_description_lenght").cast("integer"))\
    .withColumn("product_photos_qty", col("product_photos_qty").cast("integer"))\
    .withColumn("product_weight_g", col("product_weight_g").cast("integer"))\
    .withColumn("product_length_cm", col("product_length_cm").cast("integer"))\
    .withColumn("product_height_cm", col("product_height_cm").cast("integer"))\
    .withColumn("product_width_cm", col("product_width_cm").cast("integer"))\
    .withColumn("effective_start_date",current_date())\
    .withColumn("effective_end_date",to_date(lit("9999-12-31"),"yyyy-MM-dd"))\
    .withColumn("is_Active",lit(True))


In [0]:
df_silver_products.printSchema()

In [0]:
df_silver_products.describe().display()

In [0]:
df_silver_products = df_silver_products.filter( ( col("product_id").isNotNull()) & ( col("product_category_name").isNotNull()))

In [0]:
df_silver_products.describe().display()

In [0]:
df_silver_products = df_silver_products.dropDuplicates()

In [0]:
df_silver_products.write.format("delta").mode("overwrite").saveAsTable("products")

In [0]:
df_silver_products.write.format("delta").mode("overwrite").parquet("/Volumes/main/ecommerce/lakehouse_volumes/silver_dataset/products/")