In [None]:
%pip install pyspark



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("Test").getOrCreate()

In [None]:
bronze_orders_df = spark.read.format("csv").option("header","True").load("/content/drive/MyDrive/retail_data_analysis/data/bronze/orders_bronze.csv")

In [None]:
bronze_orders_df.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|         2013-07-25|            11599|         CLOSED|
|       2|         07/26/2013|              256|       COMPLETE|
|       3|2013-07-27 00:00:00|            12111|PENDING_PAYMENT|
|       4|2013-07-28 00:00:00|             NULL|         CLOSED|
|       5|         2013/07/29|             8827|       COMPLETE|
|       6|               NULL|            11318|PENDING_PAYMENT|
|       7|         2013-07-31|             9475|         CLOSED|
|       8|         2013-08-01|            15023|       COMPLETE|
+--------+-------------------+-----------------+---------------+



In [None]:
bronze_customers_df= spark.read.format("csv").option("header","True").load("/content/drive/MyDrive/retail_data_analysis/data/bronze/customers_bronze.csv")

In [None]:
bronze_customers_df.show()

+-----------+--------------+--------------+--------------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|      customer_email|customer_city|customer_state|
+-----------+--------------+--------------+--------------------+-------------+--------------+
|      11599|          John|         Smith|john.smith@exampl...|       Caguas|            PR|
|        256|         Alice|       Johnson|alice.johnson@exa...|        Miami|            FL|
|      12111|           Bob|         Brown|                NULL|      Seattle|            WA|
|       8827|         Maria|        Garcia|maria.garcia@exam...|         NULL|            TX|
|      11318|        Carlos|      Martinez|carlos.martinez@e...|      Phoenix|            AZ|
|       9475|        Olivia|         Lopez|olivia.lopez@exam...|       Boston|          NULL|
|      15023|          Noah|         Davis|noah.davis@exampl...|       Denver|            CO|
|       NULL|          Emma|      Williams|emma.williams@exa

In [None]:
bronze_order_items_df = spark.read.format("csv").option("header","True").load("/content/drive/MyDrive/retail_data_analysis/data/bronze/order_items_bronze.csv")

In [None]:
bronze_order_items_df.show()

+-------------+--------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+--------+---------------------+-------------------+-------------------+------------------------+
|            1|       1|                  101|                  2|             400.00|                  200.00|
|            2|       2|                  102|               NULL|             150.00|                  150.00|
|            3|       3|                  105|                  3|             900.00|                     abc|
|            4|       4|                  107|                  1|             100.00|                  100.00|
|            5|       5|                  109|                  5|            1250.00|                  250.00|
|            6|       6|                  111|                  3|             900.00|                  

In [None]:
from pyspark.sql.functions import col, to_date, coalesce, regexp_replace, when, lit
from pyspark.sql.types import DoubleType

In [None]:
bronze_orders_df.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|         2013-07-25|            11599|         CLOSED|
|       2|         07/26/2013|              256|       COMPLETE|
|       3|2013-07-27 00:00:00|            12111|PENDING_PAYMENT|
|       4|2013-07-28 00:00:00|             NULL|         CLOSED|
|       5|         2013/07/29|             8827|       COMPLETE|
|       6|               NULL|            11318|PENDING_PAYMENT|
|       7|         2013-07-31|             9475|         CLOSED|
|       8|         2013-08-01|            15023|       COMPLETE|
+--------+-------------------+-----------------+---------------+



In [None]:
from pyspark.sql.functions import to_date

orders_clean_df = bronze_orders_df.withColumn(
    "order_date",
    coalesce(
        to_date(col("order_date"), "yyyy-MM-dd HH:mm:ss"),
        to_date(col("order_date"), "yyyy-MM-dd"),
        to_date(col("order_date"), "MM/dd/yyyy"),
        to_date(col("order_date"), "yyyy/MM/dd")
    )

) .filter(col("order_date").isNotNull())




In [None]:
orders_clean_df.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-26|              256|       COMPLETE|
|       3|2013-07-27|            12111|PENDING_PAYMENT|
|       4|2013-07-28|             NULL|         CLOSED|
|       5|2013-07-29|             8827|       COMPLETE|
|       7|2013-07-31|             9475|         CLOSED|
|       8|2013-08-01|            15023|       COMPLETE|
+--------+----------+-----------------+---------------+



In [None]:
from pyspark.sql.functions import col, trim, regexp_replace
from pyspark.sql.types import IntegerType

orders_clean_df = (
    orders_clean_df
    # Step 1: Clean non-digit junk and trim spaces
    .withColumn(
        "order_customer_id",
        regexp_replace(trim(col("order_customer_id")), "[^0-9]", "")
    )
    # Step 2: Cast to integer
    .withColumn("order_customer_id", col("order_customer_id").cast(IntegerType()))
    # Step 3: Remove rows where customer_id is null or 0
    .filter(col("order_customer_id").isNotNull() & (col("order_customer_id") != 0))
)


In [None]:
orders_clean_df.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-26|              256|       COMPLETE|
|       3|2013-07-27|            12111|PENDING_PAYMENT|
|       5|2013-07-29|             8827|       COMPLETE|
|       7|2013-07-31|             9475|         CLOSED|
|       8|2013-08-01|            15023|       COMPLETE|
+--------+----------+-----------------+---------------+



In [None]:
customers_clean = (
    bronze_customers_df
    .withColumn("customer_city", when(col("customer_city").isNull(), lit("Unknown")).otherwise(col("customer_city")))
    .withColumn("customer_state", when(col("customer_state").isNull(), lit("Unknown")).otherwise(col("customer_state")))
    .filter(col("customer_id").isNotNull())
)

In [None]:
from pyspark.sql.functions import col, when, lit, trim

customers_clean_df = (
    bronze_customers_df
    # Step 1: Trim all string columns
    .select([trim(col(c)).alias(c) for c in bronze_customers_df.columns])

    # Step 2: Handle missing city and state
    .withColumn(
        "customer_city",
        when(col("customer_city").isNull() | (col("customer_city") == ""), lit("Unknown"))
        .otherwise(col("customer_city"))
    )
    .withColumn(
        "customer_state",
        when(col("customer_state").isNull() | (col("customer_state") == ""), lit("Unknown"))
        .otherwise(col("customer_state"))
    )

    # Step 3: Fill missing emails
    .withColumn(
        "customer_email",
        when(col("customer_email").isNull() | (col("customer_email") == ""), lit("unknown@domain.com"))
        .otherwise(col("customer_email"))
    )

    # Step 4: Drop rows with null or invalid customer_id
    .filter(col("customer_id").isNotNull())
)


In [None]:
from pyspark.sql.functions import col, when, lit, trim

customers_clean_df = (
    bronze_customers_df
    .select([trim(col(c)).alias(c) for c in bronze_customers_df.columns])
    .withColumn(
        "customer_city",
        when(col("customer_city").isNull() | (col("customer_city") == ""), lit("Unknown"))
        .otherwise(col("customer_city"))
    )
    .withColumn(
        "customer_state",
        when(col("customer_state").isNull() | (col("customer_state") == ""), lit("Unknown"))
        .otherwise(col("customer_state"))
    )
    .withColumn(
        "customer_email",
        when(col("customer_email").isNull() | (col("customer_email") == ""), lit("unknown@domain.com"))
        .otherwise(col("customer_email"))
    )
)



In [None]:
customers_clean_df.show()

+-----------+--------------+--------------+--------------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|      customer_email|customer_city|customer_state|
+-----------+--------------+--------------+--------------------+-------------+--------------+
|      11599|          John|         Smith|john.smith@exampl...|       Caguas|            PR|
|        256|         Alice|       Johnson|alice.johnson@exa...|        Miami|            FL|
|      12111|           Bob|         Brown|  unknown@domain.com|      Seattle|            WA|
|       8827|         Maria|        Garcia|maria.garcia@exam...|      Unknown|            TX|
|      11318|        Carlos|      Martinez|carlos.martinez@e...|      Phoenix|            AZ|
|       9475|        Olivia|         Lopez|olivia.lopez@exam...|       Boston|       Unknown|
|      15023|          Noah|         Davis|noah.davis@exampl...|       Denver|            CO|
|       NULL|          Emma|      Williams|emma.williams@exa

In [None]:
from pyspark.sql.functions import col, when, regexp_replace, trim
from pyspark.sql.types import DoubleType, IntegerType

order_items_clean_df = (
    bronze_order_items_df
    # Step 1: Trim spaces
    .select([trim(col(c)).alias(c) for c in bronze_order_items_df.columns])

    # Step 2: Clean quantity → keep only digits, fill missing as 1
    .withColumn(
        "order_item_quantity",
        regexp_replace(col("order_item_quantity"), "[^0-9]", "").cast(IntegerType())
    )
    .withColumn(
        "order_item_quantity",
        when(col("order_item_quantity").isNull() | (col("order_item_quantity") == 0), 1)
        .otherwise(col("order_item_quantity"))
    )

    # Step 3: Clean price → remove bad chars like 'abc', cast to float
    .withColumn(
        "order_item_product_price",
        regexp_replace(col("order_item_product_price"), "[^0-9.]", "").cast(DoubleType())
    )
    .withColumn(
        "order_item_product_price",
        when(col("order_item_product_price").isNull() | (col("order_item_product_price") == 0), 0.0)
        .otherwise(col("order_item_product_price"))
    )

    # Step 4: Recalculate subtotal when null or wrong
    .withColumn(
        "order_item_subtotal",
        when(
            col("order_item_subtotal").isNull() |
            (col("order_item_subtotal") <= 0),
            col("order_item_quantity") * col("order_item_product_price")
        ).otherwise(col("order_item_subtotal"))
    )
)


In [None]:
order_items_clean_df.show()

+-------------+--------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+--------+---------------------+-------------------+-------------------+------------------------+
|            1|       1|                  101|                  2|             400.00|                   200.0|
|            2|       2|                  102|                  1|             150.00|                   150.0|
|            3|       3|                  105|                  3|             900.00|                     0.0|
|            4|       4|                  107|                  1|             100.00|                   100.0|
|            5|       5|                  109|                  5|            1250.00|                   250.0|
|            6|       6|                  111|                  3|             900.00|                  

In [None]:
orders_clean_df.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-26|              256|       COMPLETE|
|       3|2013-07-27|            12111|PENDING_PAYMENT|
|       5|2013-07-29|             8827|       COMPLETE|
|       7|2013-07-31|             9475|         CLOSED|
|       8|2013-08-01|            15023|       COMPLETE|
+--------+----------+-----------------+---------------+



In [None]:
customers_clean_df.show()

+-----------+--------------+--------------+--------------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|      customer_email|customer_city|customer_state|
+-----------+--------------+--------------+--------------------+-------------+--------------+
|      11599|          John|         Smith|john.smith@exampl...|       Caguas|            PR|
|        256|         Alice|       Johnson|alice.johnson@exa...|        Miami|            FL|
|      12111|           Bob|         Brown|  unknown@domain.com|      Seattle|            WA|
|       8827|         Maria|        Garcia|maria.garcia@exam...|      Unknown|            TX|
|      11318|        Carlos|      Martinez|carlos.martinez@e...|      Phoenix|            AZ|
|       9475|        Olivia|         Lopez|olivia.lopez@exam...|       Boston|       Unknown|
|      15023|          Noah|         Davis|noah.davis@exampl...|       Denver|            CO|
|       NULL|          Emma|      Williams|emma.williams@exa

In [94]:
order_items_clean_df.write.format("json").mode("overwrite").save("/content/drive/MyDrive/retail_data_analysis/data/silver/order_items_clean.json")

In [98]:
customers_clean_df.write.format("json").mode("overwrite").save("/content/drive/MyDrive/retail_data_analysis/data/silver/customers_clean.json")

In [99]:
orders_clean_df.write.format("json").mode("overwrite").save("/content/drive/MyDrive/retail_data_analysis/data/silver/orders_clean.json")