In [1]:
import argparse
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F
import time

from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, DoubleType, TimestampType, LongType
from pyspark.sql.functions import col, lower, trim, when,row_number, count
from pyspark.sql import Window
from pyspark.sql.window import Window

import utils.data_processing_bronze_table as bronze_processing
import utils.data_processing_silver_table as silver_processing

## set up pyspark session

In [2]:
print('\n\n---starting job---\n\n')

# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("olist_bronze_processing") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")



---starting job---




Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/17 20:22:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Build Bronze Table

Important note: There is some discrepancy in where the datamart folder is created when the main.py script is run vs this Jupyter notebook is run.

* This Jupyter notebook will create the datamart folder inside `scripts` folder and output the bronze tables there.
* When you run the main.py script, the datamart folder will be created inside `app` folder (i.e. root) and output the bronze tables there.

Need to have team meeting to resolve this

I chose to run the main.py script, therefore subsequent code on Silver Tables built references the path from `app` folder to access the bronze tables.

In [3]:
# Create bronze root directory
bronze_root = "datamart/bronze"
os.makedirs(bronze_root, exist_ok=True)
print(f"Bronze root directory: {bronze_root}")

Bronze root directory: datamart/bronze


In [4]:
# Process all Olist datasets
print("\nProcessing Olist datasets...\n")
bronze_processing.process_olist_customers_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_olist_geolocation_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_olist_order_items_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_olist_order_payments_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_olist_order_reviews_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_olist_products_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_olist_sellers_bronze(bronze_root, spark)
print('-------------------------------------------------')
bronze_processing.process_product_cat_translation_bronze(bronze_root, spark)
print('-------------------------------------------------')


Processing Olist datasets...

loaded data/olist_customers_dataset.csv  →  99,441 rows


                                                                                

saved bronze: datamart/bronze/customers/bronze_olist_customers.parquet
-------------------------------------------------
loaded data/olist_geolocation_dataset.csv  →  1,000,325 rows


                                                                                

saved bronze: datamart/bronze/geolocation/bronze_olist_geolocation.parquet
-------------------------------------------------
loaded data/olist_order_items_dataset.csv  →  112,650 rows
saved bronze: datamart/bronze/order_items/bronze_olist_order_items.parquet
-------------------------------------------------
loaded data/olist_order_payments_dataset.csv  →  103,886 rows
saved bronze: datamart/bronze/order_payments/bronze_olist_order_payments.parquet
-------------------------------------------------
loaded data/olist_order_reviews_dataset.csv  →  104,162 rows
saved bronze: datamart/bronze/order_reviews/bronze_olist_order_reviews.parquet
-------------------------------------------------
loaded data/olist_products_dataset.csv  →  32,951 rows
saved bronze: datamart/bronze/products/bronze_olist_products.parquet
-------------------------------------------------
loaded data/olist_sellers_dataset.csv  →  3,095 rows
saved bronze: datamart/bronze/sellers/bronze_olist_sellers.parquet
--------------

In [5]:
# Process orders with monthly partitioning
bronze_processing.process_olist_orders_bronze(bronze_root, spark)

                                                                                

Month 2018-04: 6939 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_04.csv


                                                                                

Month 2018-02: 6728 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_02.csv


                                                                                

Month 2018-01: 7269 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_01.csv


                                                                                

Month 2017-12: 5673 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_12.csv


                                                                                

Month 2017-05: 3700 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_05.csv


                                                                                

Month 2018-07: 6292 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_07.csv


                                                                                

Month 2017-11: 7544 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_11.csv


                                                                                

Month 2017-01: 800 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_01.csv


                                                                                

Month 2016-09: 4 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2016_09.csv


                                                                                

Month 2016-10: 324 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2016_10.csv


                                                                                

Month 2017-03: 2682 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_03.csv


                                                                                

Month 2017-04: 2404 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_04.csv


                                                                                

Month 2017-10: 4631 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_10.csv


                                                                                

Month 2017-09: 4285 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_09.csv


                                                                                

Month 2017-02: 1780 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_02.csv
Month 2018-09: 16 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_09.csv


                                                                                

Month 2017-06: 3245 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_06.csv


                                                                                

Month 2018-06: 6167 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_06.csv


                                                                                

Month 2017-07: 4026 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_07.csv


                                                                                

Month 2018-08: 6512 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_08.csv


                                                                                

Month 2017-08: 4331 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2017_08.csv


                                                                                

Month 2018-03: 7211 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_03.csv


                                                                                

Month 2018-05: 6873 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_05.csv


                                                                                

Month 2018-10: 4 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2018_10.csv


                                                                                

Month 2016-12: 1 rows
Saved to: datamart/bronze/orders/bronze_olist_orders_2016_12.csv


DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, snapshot_date: string]

In [6]:
# Inspect some output
# I put the actual path due to the discrepancy in paths above. Will amend later
# df_bronze = spark.read.parquet("../datamart/bronze/customers/bronze_olist_customers.parquet")
# df_bronze.show(5)

# Can read


# Maanoj for testing
df_bronze = spark.read.parquet("datamart/bronze/customers/bronze_olist_customers.parquet")
df_bronze.show(5)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|503840d4f2a1a7609...|ffc4233210eac4ec1...|                   14811|          araraquara|            SP|
|52e73a5d0a1d4c56b...|b43530186123fb6d9...|                   62625|               missi|            CE|
|16cb62869f9719571...|c3cc321141423ab8a...|                   55560|           barreiros|            PE|
|4979ba0e6037e4b28...|80768413a59684f1e...|                   29307|cachoeiro de itap...|            ES|
|11ec4bc0610184925...|bd836cf4fce7f808b...|                   22420|      rio de janeiro|            RJ|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows


## Build Silver Table

Important note: There is some discrepancy in where the datamart folder is created when the main.py script is run vs this Jupyter notebook is run.

* This Jupyter notebook will create the datamart folder inside `scripts` folder and output the silver tables there.
* When you run the main.py script, the datamart folder will be created inside `app` folder (i.e. root) and output the silver tables there.

Need to have team meeting to resolve this

In [7]:
# Create silver root directory
silver_root = "datamart/silver"
os.makedirs(silver_root, exist_ok=True)
print(f"Silver root directory: {silver_root}")

Silver root directory: datamart/silver


In [8]:
# Create all required output directories

# Create silver directory to save customer data
silver_cust_directory = "datamart/silver/customers/"
if not os.path.exists(silver_cust_directory):
    os.makedirs(silver_cust_directory)

# Create silver directory to save seller data
silver_sell_directory = "datamart/silver/sellers/"
if not os.path.exists(silver_sell_directory):
    os.makedirs(silver_sell_directory)

# Create silver directory to save geolocation data
silver_geo_directory = "datamart/silver/geolocation/"
if not os.path.exists(silver_geo_directory):
    os.makedirs(silver_geo_directory)


# Create silver directory to save products data
silver_prod_directory = "datamart/silver/products/"
if not os.path.exists(silver_prod_directory):
    os.makedirs(silver_prod_directory)

# # Create silver directory to save product_categories data
# silver_prod_cat_directory = "datamart/silver/product_categories/"
# if not os.path.exists(silver_prod_cat_directory):
#     os.makedirs(silver_prod_cat_directory)

# Create silver directory to save orders data
silver_orders_directory = "datamart/silver/orders/"
if not os.path.exists(silver_orders_directory):
    os.makedirs(silver_orders_directory)

# Create silver directory to save order_items data
silver_order_items_directory = "datamart/silver/order_items/"
if not os.path.exists(silver_order_items_directory):
    os.makedirs(silver_order_items_directory)

In [5]:
# Process all bronze tables into silver
print("\nProcessing bronze tables...")
silver_processing.process_silver_olist_customers("../datamart/bronze/customers/",silver_cust_directory, spark)
silver_processing.process_silver_olist_sellers("../datamart/bronze/sellers/",silver_sell_directory, spark)
silver_processing.process_silver_olist_geolocation("../datamart/bronze/geolocation/",silver_geo_directory, spark)

# add more below
silver_processing.process_silver_olist_products("../datamart/bronze/products/",silver_prod_directory, spark)
# silver_processing.process_silver_olist_product_categories("../datamart/bronze/??/",silver_prod_cat_directory, spark)
silver_processing.process_silver_olist_orders("../datamart/bronze/orders/",silver_orders_directory, spark)
silver_processing.process_silver_olist_order_items("../datamart/bronze/order_items/",silver_order_items_directory, spark)


Processing bronze tables...
loaded from: ../datamart/bronze/customers/bronze_olist_customers.parquet row count: 99441
Number of duplicated 'customer_id': 0


                                                                                

saved to: datamart/silver/customers/silver_olist_customers.parquet
loaded from: ../datamart/bronze/sellers/bronze_olist_sellers.parquet row count: 3095
Number of duplicated 'seller_id': 0
saved to: datamart/silver/sellers/silver_olist_sellers.parquet
loaded from: ../datamart/bronze/geolocation/bronze_olist_geolocation.parquet row count: 1000325


                                                                                

saved to: datamart/silver/geolocation/silver_olist_geolocation.parquet


DataFrame[geolocation_zip_code_prefix: string, geolocation_lat: double, geolocation_lng: double]

In [None]:
# Process orders with monthly partitioning
# add more below

In [6]:
# Inspect some output
df_silver = spark.read.parquet("datamart/silver/geolocation/silver_olist_geolocation.parquet")
df_silver.show(5)

# Can read

+---------------------------+-------------------+-------------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|
+---------------------------+-------------------+-------------------+
|                      49290|-11.274805005391439|-37.790795516967776|
|                      49630|-10.605308055877686|-37.113027572631836|
|                      55445|   -8.5616774559021|  -35.8295783996582|
|                      57051| -9.655002400681779| -35.73440123893119|
|                      57085| -9.558634171119103| -35.73914117079515|
+---------------------------+-------------------+-------------------+
only showing top 5 rows



### Build Customer Table

In [8]:
# Create silver directory to save customer data
silver_cust_directory = "datamart/silver/customers/"
if not os.path.exists(silver_cust_directory):
    os.makedirs(silver_cust_directory)

In [9]:
def process_silver_olist_customers(bronze_directory, silver_directory, spark):
    
    # connect to bronze table
    partition_name = "bronze_olist_customers.parquet"
    filepath = bronze_directory + partition_name
    df = spark.read.parquet(filepath)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "customer_id": StringType(),
        "customer_unique_id": StringType(),
        "customer_zip_code_prefix": StringType(),
        "customer_city": StringType(),
        "customer_state": StringType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # Check customer_id duplicates (total rows - distinct ids)
    total_rows = df.count()
    distinct_rows = df.select("customer_id").distinct().count()
    duplicates_customer_id = total_rows - distinct_rows
    print(f"Number of duplicated 'customer_id': {duplicates_customer_id}")

    # Add missing leading zero
    df = df.withColumn(
        "customer_zip_code_prefix",
        F.lpad(col("customer_zip_code_prefix"), 5, "0")
    )
    
    # save silver table - IRL connect to database to write
    partition_name = "silver_olist_customers.parquet"
    filepath = silver_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

In [10]:
# Run function manually to test
# I inputted the bronze_directory manually (amend after our path discrepancies are resolved)
df = process_silver_olist_customers("../datamart/bronze/customers/",silver_cust_directory, spark)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/app/datamart/bronze/customers/bronze_olist_customers.parquet. SQLSTATE: 42K03

In [None]:
# Check schema enforced
df.printSchema()

In [None]:
# Check missing leading zero padded
df.groupBy(F.length("customer_zip_code_prefix").alias("length")).count().show()

### Build Seller Table

In [None]:
# Create silver directory to save seller data
silver_sell_directory = "datamart/silver/sellers/"
if not os.path.exists(silver_sell_directory):
    os.makedirs(silver_sell_directory)

In [None]:
def process_silver_olist_sellers(bronze_directory, silver_directory, spark):
    
    # connect to bronze table
    partition_name = "bronze_olist_sellers.parquet"
    filepath = bronze_directory + partition_name
    df = spark.read.parquet(filepath)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "seller_id": StringType(),
        "seller_zip_code_prefix": StringType(),
        "seller_city": StringType(),
        "seller_state": StringType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # Check seller_id duplicates (total rows - distinct ids)
    total_rows = df.count()
    distinct_rows = df.select("seller_id").distinct().count()
    duplicates_seller_id = total_rows - distinct_rows
    print(f"Number of duplicated 'seller_id': {duplicates_seller_id}")

    # Add missing leading zero
    df = df.withColumn(
        "seller_zip_code_prefix",
        F.lpad(col("seller_zip_code_prefix"), 5, "0")
    )
    
    # save silver table - IRL connect to database to write
    partition_name = "silver_olist_sellers.parquet"
    filepath = silver_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

In [None]:
# Run function manually to test
# I inputted the bronze_directory manually (amend after our path discrepancies are resolved)
df = process_silver_olist_sellers("../datamart/bronze/sellers/",silver_sell_directory, spark)

In [None]:
# Check schema enforced
df.printSchema()

In [None]:
# Check missing leading zero padded
df.groupBy(F.length("seller_zip_code_prefix").alias("length")).count().show()

### Build Geolocation Table

In [None]:
# Create silver directory to save geolocation data
silver_geo_directory = "datamart/silver/geolocation/"
if not os.path.exists(silver_geo_directory):
    os.makedirs(silver_geo_directory)

In [None]:
def process_silver_olist_geolocation(bronze_directory, silver_directory, spark):
    
    # connect to bronze table
    partition_name = "bronze_olist_geolocation.parquet"
    filepath = bronze_directory + partition_name
    df = spark.read.parquet(filepath)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "geolocation_zip_code_prefix": StringType(),
        "geolocation_lat": FloatType(),
        "geolocation_lng": FloatType(),
        "geolocation_city": StringType(),
        "geolocation_state": StringType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # Add missing leading zero
    df = df.withColumn(
        "geolocation_zip_code_prefix",
        F.lpad(col("geolocation_zip_code_prefix"), 5, "0")
    )

    # Deduplicate zipcodes by just taking the centroid (mean of lat,lng)
    df_dedupe = df.groupBy("geolocation_zip_code_prefix").agg(
        F.avg("geolocation_lat").alias("geolocation_lat"),
        F.avg("geolocation_lng").alias("geolocation_lng")
    )
    
    # save silver table - IRL connect to database to write
    partition_name = "silver_olist_geolocation.parquet"
    filepath = silver_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df_dedupe

In [11]:
# Run function manually to test
# I inputted the bronze_directory manually (amend after our path discrepancies are resolved)
df = process_silver_olist_geolocation("../datamart/bronze/geolocation/",silver_geo_directory, spark)

NameError: name 'process_silver_olist_geolocation' is not defined

In [12]:
# Check schema enforced
df.printSchema()

root
 |-- geolocation_zip_code_prefix: string (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)



In [13]:
# Check missing leading zero padded
df.groupBy(F.length("geolocation_zip_code_prefix").alias("length")).count().show()

+------+-----+
|length|count|
+------+-----+
|     5|19177|
+------+-----+



In [14]:
# Check every geolocation_zip_code_prefix only has 1 count. Group by prefix and count occurrences
df.groupBy("geolocation_zip_code_prefix") \
    .agg(F.count("*").alias("count")) \
    .filter("count > 1") \
    .show()

+---------------------------+-----+
|geolocation_zip_code_prefix|count|
+---------------------------+-----+
+---------------------------+-----+



### Build Products Table

In [12]:
# Create silver directory to save products data
silver_prod_directory = "datamart/silver/products/"
if not os.path.exists(silver_prod_directory):
    os.makedirs(silver_prod_directory)

In [13]:
def process_silver_olist_products(bronze_directory, silver_directory, spark):
    
    # connect to bronze table
    partition_name = "bronze_olist_products.parquet"
    filepath = bronze_directory + partition_name
    df = spark.read.parquet(filepath)
    print('loaded from:', filepath, 'row count:', df.count())

    # Rename columns due to spelling mistakes 
    df = df.withColumnRenamed("product_name_lenght", "product_name_length") \
           .withColumnRenamed("product_description_lenght", "product_description_length")

    
    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "product_id": StringType(),
        "product_category_name": StringType(),
        "product_name_length": DoubleType(),
        "product_description_length": DoubleType(),
        "product_photos_qty": DoubleType(),
        "product_weight_g": DoubleType(),
        "product_length_cm": DoubleType(),
        "product_height_cm": DoubleType(),
        "product_width_cm": DoubleType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # Inputting missing values as NaN
    df = df.fillna({"product_category_name": "NaN"})
    df = df.fillna({"product_name_length": float('nan')}) 
    df = df.fillna({"product_description_length": float('nan')}) 
    df = df.fillna({"product_photos_qty": float('nan')}) 
    
    # Check product_id duplicates (total rows - distinct ids)
    total_rows = df.count()
    distinct_rows = df.select("product_id").distinct().count()
    duplicates_product_id = total_rows - distinct_rows
    print(f"Number of duplicated 'product_id': {duplicates_product_id}")


    
    # save silver table - IRL connect to database to write
    partition_name = "silver_olist_products.parquet"
    filepath = silver_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

In [14]:
# Run function manually to test
# I inputted the bronze_directory manually (amend after our path discrepancies are resolved)
df = process_silver_olist_products("datamart/bronze/products/",silver_prod_directory, spark)

loaded from: datamart/bronze/products/bronze_olist_products.parquet row count: 32951
Number of duplicated 'product_id': 0


                                                                                

saved to: datamart/silver/products/silver_olist_products.parquet


In [15]:
# Check schema enforced
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = false)
 |-- product_name_length: double (nullable = false)
 |-- product_description_length: double (nullable = false)
 |-- product_photos_qty: double (nullable = false)
 |-- product_weight_g: double (nullable = true)
 |-- product_length_cm: double (nullable = true)
 |-- product_height_cm: double (nullable = true)
 |-- product_width_cm: double (nullable = true)



In [16]:
# Inspect some output
df = spark.read.parquet("datamart/silver/products/silver_olist_products.parquet")
df.show(5)

# Can read

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|               40.0|                     287.0|               1.0|           225.0|             16.0|             10.0|            14.0|
|3aa071139cb16b67c...|                artes|               44.0|                     276.0|               1.0|          1000.0|             30.0|             18.0|            20.0|
|96bd76ec8810374ed...|        esporte_lazer|               46.0|                     250.0|    

### Build Product Categories Table??

In [50]:
# not sure 

### Build Orders Table

In [17]:
# Create silver directory to save orders data
silver_orders_directory = "datamart/silver/orders/"
if not os.path.exists(silver_orders_directory):
    os.makedirs(silver_orders_directory)

In [18]:
def process_silver_olist_orders(bronze_directory, silver_directory, spark, partition_name):
    filepath = os.path.join(bronze_directory, partition_name)
    df = spark.read.option("header", True).option("inferSchema", True).csv(filepath)
    print('loaded from:', filepath, 'row count:', df.count())


# def process_silver_olist_orders(bronze_directory, silver_directory, spark):
    
    # connect to bronze table
    
    # partition_name = "bronze_olist_orders.parquet"
    # filepath = bronze_directory + partition_name
    # df = spark.read.parquet(filepath)
    # print('loaded from:', filepath, 'row count:', df.count())



    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "order_id": StringType(),
        "customer_id": StringType(),
        "order_status": StringType(),
        "order_purchase_timestamp": TimestampType(),
        "order_approved_at": TimestampType(),
        "order_delivered_carrier_date": TimestampType(),
        "order_delivered_customer_date": TimestampType(),
        "order_estimated_delivery_date": TimestampType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # Removing Invalid order ids
    # Load the Bronze table 
    df_order_items = spark.read.parquet("datamart/bronze/order_items/bronze_olist_order_items.parquet") 
    
    # Get distinct order IDs that exist in order items
    valid_order_ids_df = df_order_items.select("order_id").distinct()
    
    
    # Keep only orders that exist in df_order_items
    df_orders_clean = df.join(valid_order_ids_df, on="order_id", how="inner")
    
    # Count how many were dropped
    dropped_orders = df.count() - df_orders_clean.count()
    print(f"Dropped {dropped_orders} orders with no items.")

    df = df_orders_clean


    # Checking for invalid customer IDs
    # Load df_customers from Bronze
    df_customers = spark.read.parquet("datamart/bronze/customers/bronze_olist_customers.parquet")  

    # Get distinct valid customer IDs
    valid_customer_ids_df = df_customers.select("customer_id").distinct()
    
    # Perform a left anti join to find orders with invalid customer_id
    invalid_orders = df.join(valid_customer_ids_df, on="customer_id", how="left_anti")
    
    # Count how many invalid customer IDs there are
    invalid_customer_count = invalid_orders.count()

    # Conditionally drop invalid orders
    if invalid_customer_count > 0:
        initial_count = df.count()
        print("Dropping orders with invalid customer_id...")
        df = df.join(valid_customer_ids_df, on="customer_id", how="inner")
        final_count = df.count()
        dropped_count = initial_count - final_count
        print(f"Dropped {dropped_count} rows")
        
    else:
        print("All customer ids are valid — no need to drop!!")



    # Enforcing enum for order statuses
    # Define valid statuses 
    valid_statuses = {
        "created",
        "approved",
        "processing",
        "invoiced",
        "shipped",
        "delivered",
        "canceled",
        "unavailable"
    }
    
    # Clean and standardize the `order_status` column
    df = df.withColumn("order_status", trim(lower(col("order_status"))))
    
    # dentify invalid statuses (those NOT in the valid_statuses set)
    invalid_statuses_df = df.filter(~col("order_status").isin(list(valid_statuses)))
    
    # Print the unique invalid statuses
    invalid_statuses_list = invalid_statuses_df.select("order_status").distinct().rdd.flatMap(lambda x: x).collect()

    if invalid_statuses_list:
        print(f"Invalid statuses found: {invalid_statuses_list}")
    else:
        print("No invalid status found!!")


    
    # # save silver table - IRL connect to database to write
    # partition_name = "silver_olist_orders_2016_09.parquet"  
    # filepath = silver_directory + partition_name
    # df.write.mode("overwrite").parquet(filepath)
    # print('saved to:', filepath)


    # save 
    parquet_name = partition_name.replace("bronze", "silver").replace(".csv", ".parquet")
    output_path = os.path.join(silver_directory, parquet_name)
    df.write.mode("overwrite").parquet(output_path)
    print("-----> saved to:", output_path)

    return df

In [19]:
# Run function manually to test

# Set base directory
bronze_orders_directory = "datamart/bronze/orders/"
silver_orders_directory = "datamart/silver/orders/"

# List all CSV files in the bronze orders folder
csv_files = [f for f in os.listdir(bronze_orders_directory) if f.endswith(".csv")]

# Sort the files according to date
csv_files.sort()

# Loop through each file 
for partition_name in csv_files:
    print(f"\n======== Processing {partition_name} ......... \n")
    df = process_silver_olist_orders(bronze_orders_directory, silver_orders_directory, spark, partition_name)
    
    # Check schema enforced
    df.printSchema()




loaded from: datamart/bronze/orders/bronze_olist_orders_2016_09.csv row count: 4
Dropped 1 orders with no items.
All customer ids are valid — no need to drop!!


                                                                                

No invalid status found!!
-----> saved to: datamart/silver/orders/silver_olist_orders_2016_09.parquet
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- snapshot_date: string (nullable = true)



loaded from: datamart/bronze/orders/bronze_olist_orders_2016_10.csv row count: 324
Dropped 16 orders with no items.
All customer ids are valid — no need to drop!!
No invalid status found!!
-----> saved to: datamart/silver/orders/silver_olist_orders_2016_10.parquet
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase

### Build Order_Items Table

In [20]:
# Create silver directory to save order_items data
silver_order_items_directory = "datamart/silver/order_items/"
if not os.path.exists(silver_order_items_directory):
    os.makedirs(silver_order_items_directory)

In [21]:
def process_silver_olist_order_items(bronze_directory, silver_directory, spark):
    
    # connect to bronze table
    partition_name = "bronze_olist_order_items.parquet"
    filepath = bronze_directory + partition_name
    df = spark.read.parquet(filepath)
    print('loaded from:', filepath, 'row count:', df.count())

    
    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "order_id": StringType(),
        "order_item_id": LongType(),
        "product_id": StringType(),
        "seller_id": StringType(),
        "shipping_limit_date": TimestampType(),
        "price": DoubleType(),
        "freight_value": DoubleType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    
    # Checking for invalid seller IDs
    # Load df_sellers from Bronze
    df_sellers = spark.read.parquet("datamart/bronze/sellers/bronze_olist_sellers.parquet")  

    # Get distinct valid seller IDs
    valid_seller_ids_df = df_sellers.select("seller_id").distinct()
    
    # Perform a left anti join to find sellers with invalid seller_id
    invalid_orders = df.join(valid_seller_ids_df, on="seller_id", how="left_anti")
    
    # Count how many invalid seller IDs there are
    invalid_seller_count = invalid_orders.count()

    # Conditionally drop invalid orders
    if invalid_seller_count > 0:
        initial_count = df.count()
        print("Dropping orders with invalid seller_id...")
        df = df.join(valid_seller_ids_df, on="seller_id", how="inner")
        final_count = df.count()
        dropped_count = initial_count - final_count
        print(f"Dropped {dropped_count} rows")
        
    else:
        print("All seller ids are valid — no need to drop!!")

    
    # save silver table - IRL connect to database to write
    partition_name = "silver_olist_order_items.parquet"
    filepath = silver_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

In [22]:
# Run function manually to test
# I inputted the bronze_directory manually (amend after our path discrepancies are resolved)
df = process_silver_olist_order_items("datamart/bronze/order_items/",silver_order_items_directory, spark)

loaded from: datamart/bronze/order_items/bronze_olist_order_items.parquet row count: 112650
All seller ids are valid — no need to drop!!
saved to: datamart/silver/order_items/silver_olist_order_items.parquet


In [23]:
# Check schema enforced
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [24]:
# Inspect some output
df = spark.read.parquet("datamart/silver/order_items/silver_olist_order_items.parquet")
df.show(5)

# Can read

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|89bed55dd88d035e3...|            1|ae6b739ab6e9d7991...|53e4c6e0f4312d4d2...|2018-01-11 17:12:22|  42.0|         17.6|
|89c037e2b749a2ed5...|            1|e906fa76a27488f80...|1835b56ce799e6a4d...|2017-11-30 04:15:33| 53.99|        12.72|
|89c037e2b749a2ed5...|            2|e906fa76a27488f80...|1835b56ce799e6a4d...|2017-11-30 04:15:33| 53.99|        12.72|
|89c04d22504649482...|            1|3d73c88390adac7dd...|c8b0e2b0a7095e5d8...|2018-03-14 00:20:26|199.99|        25.05|
|89c0bf5292a493fb2...|            1|2d40d83fc97b8d4d4...|cca3071e3e9bb7d12...|2017-08-24 03:26:15|  29.9|        11.85|
+--------------------+-------------+----

### Building Derived Table - Order logistics

In [27]:

def build_order_features(spark, order_file_path):
    # Read inputs
    df_order_items = spark.read.parquet("datamart/silver/order_items/silver_olist_order_items.parquet")
    df_products = spark.read.parquet("datamart/silver/products/silver_olist_products.parquet")
    df_categories = spark.read.parquet("datamart/bronze/category_translation/bronze_product_category_translation.parquet")
    df_orders = spark.read.parquet(order_file_path)

    order_metrics = df_order_items.groupBy("order_id").agg(
        F.max("order_item_id").alias("total_qty"),
        F.sum("price").alias("total_price"),
        F.sum("freight_value").alias("total_freight_value")
    )

    df_items_with_products = df_order_items.select("order_id", "product_id") \
        .join(
            df_products.select(
                "product_id", "product_weight_g",
                "product_length_cm", "product_height_cm", "product_width_cm"
            ),
            on="product_id", how="left"
        )

    df_items_with_products = df_items_with_products.withColumn(
        "product_volume_cm3",
        col("product_length_cm") * col("product_height_cm") * col("product_width_cm")
    )

    product_metrics = df_items_with_products.groupBy("order_id").agg(
        F.sum("product_weight_g").alias("total_weight_g"),
        F.sum("product_volume_cm3").alias("total_volume_cm3")
    )

    final_df = df_orders.select("order_id", "order_purchase_timestamp") \
        .join(order_metrics, on="order_id", how="inner") \
        .join(product_metrics, on="order_id", how="left") \
        .withColumn(
            "total_density",
            when(col("total_volume_cm3") != 0,
                 col("total_weight_g") / col("total_volume_cm3")
            ).otherwise(None)
        )

    df_items_with_cats = df_order_items.select("order_id", "product_id") \
        .join(df_products.select("product_id", "product_category_name"), on="product_id", how="left") \
        .join(df_categories.select("product_category_name", "main_category", "sub_category"), on="product_category_name", how="left")

    main_cat_counts = df_items_with_cats.groupBy("order_id", "main_category") \
        .agg(count("*").alias("main_cat_count"))
    main_cat_window = Window.partitionBy("order_id").orderBy(col("main_cat_count").desc())
    most_common_main = main_cat_counts.withColumn(
        "rank", row_number().over(main_cat_window)
    ).filter(col("rank") == 1).drop("rank", "main_cat_count")

    sub_cat_counts = df_items_with_cats.groupBy("order_id", "sub_category") \
        .agg(count("*").alias("sub_cat_count"))
    sub_cat_window = Window.partitionBy("order_id").orderBy(col("sub_cat_count").desc())
    most_common_sub = sub_cat_counts.withColumn(
        "rank", row_number().over(sub_cat_window)
    ).filter(col("rank") == 1).drop("rank", "sub_cat_count")

    order_categories = most_common_main.join(most_common_sub, on="order_id", how="outer")
    final_df_with_cats = final_df.join(order_categories, on="order_id", how="left")

    return final_df_with_cats


In [28]:
# Keep track of failures
failed_files = []
processed_files = []

order_files = sorted(glob.glob("datamart/silver/orders/silver_olist_orders_*.parquet"))

# Create output directory if it doesn't exist
os.makedirs("datamart/silver/order_logistics", exist_ok=True)

# Loop over files 
for idx, file_path in enumerate(order_files, 1):
    
    basename = os.path.basename(file_path)  
    year_month = basename.replace("silver_olist_orders_", "").replace(".parquet", "")

    output_path = f"datamart/silver/order_logistics/silver_olist_order_logistics_{year_month}.parquet"
    
    print(f"\n[{idx}/{len(order_files)}]  Processing {year_month} ({basename})...")

    # Skip if already exists
    if os.path.exists(output_path):
        print(f" Skipping {year_month} (already exists)")
        continue
    
    try:
        start_time = time.time()

        # Run feature engineering
        final_df = build_order_features(spark, file_path)

        # Save to parquet
        final_df.write.mode("overwrite").parquet(output_path)

        # Verify row count
        count = final_df.count()
        duration = round(time.time() - start_time, 2)

        print(f"---> Saved: {output_path} → {count} rows in {duration}s")

        processed_files.append((year_month, count, duration))

    except Exception as e:
        print(f" Failed on {year_month}: {e}")
        failed_files.append((year_month, str(e)))

# Summary
print("\n===== Processing Summary =====")
print(f" Successfully processed: {len(processed_files)} files")
for ym, count, duration in processed_files:
    print(f"  - {ym}: {count} rows in {duration}s")

if failed_files:
    print(f"\n Failed files: {len(failed_files)}")
    for ym, err in failed_files:
        print(f"  - {ym}: {err}")
else:
    print("\n All files processed successfully")



[1/25]  Processing 2016_09 (silver_olist_orders_2016_09.parquet)...
---> Saved: datamart/silver/order_logistics/silver_olist_order_logistics_2016_09.parquet → 3 rows in 4.65s

[2/25]  Processing 2016_10 (silver_olist_orders_2016_10.parquet)...
---> Saved: datamart/silver/order_logistics/silver_olist_order_logistics_2016_10.parquet → 308 rows in 3.55s

[3/25]  Processing 2016_12 (silver_olist_orders_2016_12.parquet)...
---> Saved: datamart/silver/order_logistics/silver_olist_order_logistics_2016_12.parquet → 1 rows in 2.86s

[4/25]  Processing 2017_01 (silver_olist_orders_2017_01.parquet)...
---> Saved: datamart/silver/order_logistics/silver_olist_order_logistics_2017_01.parquet → 789 rows in 3.22s

[5/25]  Processing 2017_02 (silver_olist_orders_2017_02.parquet)...
---> Saved: datamart/silver/order_logistics/silver_olist_order_logistics_2017_02.parquet → 1733 rows in 3.11s

[6/25]  Processing 2017_03 (silver_olist_orders_2017_03.parquet)...
---> Saved: datamart/silver/order_logistics/

In [30]:
# Inspect some output
df_orders_logistics = spark.read.parquet("datamart/silver/order_logistics/silver_olist_order_logistics_2018_01.parquet")
df_orders_logistics.show(10)

+--------------------+------------------------+---------+-----------+-------------------+--------------+----------------+--------------------+---------------+------------+
|            order_id|order_purchase_timestamp|total_qty|total_price|total_freight_value|total_weight_g|total_volume_cm3|       total_density|  main_category|sub_category|
+--------------------+------------------------+---------+-----------+-------------------+--------------+----------------+--------------------+---------------+------------+
|8ab67a33f5086c597...|     2018-01-20 23:34:50|        1|       19.9|              17.63|         150.0|          2700.0| 0.05555555555555555|      telephony|          NA|
|90c7f72cacf60b2a6...|     2018-01-04 18:18:46|        1|      129.0|              13.92|        6550.0|         31920.0| 0.20520050125313283|     stationery|          NA|
|a322a4aa3b0a9b47a...|     2018-01-07 11:53:56|        1|       23.8|              21.15|         283.0|         11040.0|0.02563405797101449

## Build Gold Table (Features)

## Inspect Feature Store

## Build Gold Table (Label)

## Inspect Label Store

## Stop Spark Session

In [1]:
# End spark session
spark.stop()

print('\n\n---completed job---\n\n')

NameError: name 'spark' is not defined