## Variables

In [97]:
# stagging path
source_path = "source_files"

# path to sources
tables = {
    "products": {
        "path": "products (1).csv",
        "PK": "ProductID",  # Chave primária
        "FK": []  # Lista de chaves estrangeiras associadas (vazia se não houver)
    },
    "sales_order_detail": {
        "path": "sales_order_detail.csv",
        "PK": "SalesOrderDetailID",  # Chave primária
        "FK": ["SalesOrderID", "ProductID"]  # Supondo que o order_id seja uma chave estrangeira
    },
    "sales_order_header": {
        "path": "sales_order_header (1).csv",
        "PK": "SalesOrderID",  # Chave primária
        "FK": ["CustomerID", "SalesPersonID"]  # Lista de chaves estrangeiras associadas (vazia se não houver)
    }
}

## Spark Session Creation

In [125]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, udf, to_date
from pyspark.sql.types import DateType, IntegerType
from datetime import timedelta

# create spark Session
spark = SparkSession.builder \
    .appName("Pipeline") \
    .getOrCreate()

# show spark version
print(spark.version)

# Configurar o logger para ERROR - Spark Run A Lot of Error Running Locally 
spark.sparkContext.setLogLevel("ERROR")

3.5.5


## Raw Function [Data Loading]

In [104]:
def raw(tables={}):
    """ 
    1. Data Loading
        Processes and saves tables from CSV files into Parquet format.

    This function loops through a dictionary of tables, loads each CSV file, 
    checks for duplicates in the primary key (PK), and saves the table in 
    Parquet format.

    Parameters:
    ----------
    tables : dict
        A dictionary where each key is the name of a table and the value is 
        another dictionary containing:
        
        - 'path': the path to the CSV file.
        - 'PK': the name of the primary key column.
    """

    # for each table in the parameter list
    for table_name, table_params in tables.items():
        
        # create table path
        table_path = f"{source_path}/{table_params['path']}"

        # load csv using header, auto dectect schema from spark
        stagging_table = spark.read.csv(table_path, header=True, inferSchema=True)
        
        # grant unique PK
        if stagging_table.groupBy(table_params['PK']).count().filter(col("count") > 1).count() > 0:

            # drop duplicates using PK - use first line
            stagging_table = stagging_table.dropDuplicates(subset=[table_params['PK']])
            # raise ValueError(f"column '{table_params['PK']}' has duplicates.")

        # save raw table
        stagging_table.write \
            .mode("overwrite") \
            .parquet(f"data/raw_{table_name}", compression="none")

In [105]:
raw(tables)

## Trusted Function [Data Review and Storage]

In [106]:
def trusted(tables={}):
    """ 
    2. Data Review and Storage
        Processes and saves trusted tables from raw Parquet files.

    This function iterates over a dictionary of tables, loads each raw 
    Parquet file, modifies the primary key (PK) and foreign key (FK) 
    columns by appending a suffix, and then saves the modified table 
    in a new Parquet file.

    Parameters:
    ----------
    tables : dict
        A dictionary where each key is the name of a table and the value is 
        another dictionary containing:
        
        - 'PK': the name of the primary key column.
        - 'FK': a list of foreign key column names.

    Notes:
    ------
    The raw tables are expected to be stored under the 'data/raw_' prefix,
    and the processed trusted tables will be saved with the 'data/store_' prefix.
    """

    # for each table in the parameter list
    for table_name, table_params in tables.items():

        # load table
        raw_path = f"data/raw_{table_name}"
        raw_table = spark.read.parquet(raw_path)

        # flag PK column
        raw_table = raw_table.withColumnRenamed(table_params['PK'], table_params['PK']+'_PK')

        # flag FK columns
        for fk_col in table_params['FK']:
            raw_table = raw_table.withColumnRenamed(fk_col, fk_col+'_FK')

        # save store_ table 
        raw_table.write \
            .mode("overwrite") \
            .parquet(f"data/store_{table_name}", compression="none")

In [107]:
trusted(tables)

## Refined Function [Transformations]

In [108]:
def refined_products():
    """ 
    3. Product Master Transformations:
        Processes and refines product data by loading from a Parquet file, 
    replacing NULL values, and enhancing the product category.

    The function performs the following operations:
    
    - Loads a product table from Parquet format.
    - Replaces NULL values in the "Color" field with "N/A".
    - Enhances the "ProductCategoryName" field
    
    Notes:
    ------
    - The source data is expected to be located at `data/store_products`.
    - The refined product data will be saved to `data/publish_product`.
    """

    products_table = 'products'

    # load table
    refined_path = f"data/store_{products_table}"
    products_df = spark.read.parquet(refined_path)

    # i. 
    # Replace NULL values in the Color field with N/A.
    products_df = products_df.fillna({'Color': 'N/A'})

    # ii. 
    # Enhance the ProductCategoryName field when it is NULL using the
    # following logic:
    products_df = products_df.withColumn("ProductCategoryName",
        when(
            col("ProductCategoryName").isNull() 
            & col("ProductSubCategoryName").isin("Gloves", "Shorts", "Socks", "Tights", "Vests"), 
        "Clothing")
        .when(
            col("ProductCategoryName").isNull() 
            & col("ProductSubCategoryName").isin("Locks", "Lights", "Headsets", "Helmets", "Pedals", "Pumps"), 
        "Accessories")
        .when(
            col("ProductCategoryName").isNull() 
            & (col("ProductSubCategoryName").contains("Frames") | col("ProductSubCategoryName").isin("Wheels", "Saddles")), 
        "Frames")
        .otherwise(col("ProductCategoryName")) # Still has NULLS !!!
    )

    # save refined table
    products_df.write \
            .mode("overwrite") \
            .parquet("data/publish_product", compression="none")

In [109]:
refined_products()

In [153]:
def refined_salesOrders():
    """
        Processes and refines sales orders by loading data from parquet files,
    calculating additional fields, and saving the results into a new parquet file.

    The function performs the following operations:

    1. Loads sales order header and detail data from parquet files.
    2. Joins the header and detail DataFrames on the SalesOrderID columns.
    3. Calculates the lead time in business days between OrderDate and ShipDate, 
       excluding Saturdays and Sundays.
    4. Calculates the TotalLineExtendedPrice based on the formula:
       OrderQty * (UnitPrice - UnitPriceDiscount).
    5. Creates a new DataFrame with selected columns, including new calculated columns.
    6. Renames the "Freight" column to "TotalOrderFreight".
    7. Writes the refined DataFrame to a parquet file named "publish_orders".

    Notes:
    ------
    - The parquet data source is expected to be located in the "data" directory
      with filenames following the pattern "store_sales_order_header" and 
      "store_sales_order_detail".
    - The output will be saved as a parquet file named "publish_orders" in the "data" directory.
    - This function utilizes a custom user-defined function (UDF) to calculate
      the lead time in business days.
    """

    # source table names
    header_table = 'sales_order_header'
    detail_table = 'sales_order_detail'


    # load tables 
    orderHeader_df = spark.read.parquet(f'data/store_{header_table}')
    orderDetail_df = spark.read.parquet(f'data/store_{detail_table}')

    # join header and details as inner getting only headers that have details attached
    orders_df = orderHeader_df.join(
        orderDetail_df, 
        orderDetail_df.SalesOrderID_FK==orderHeader_df.SalesOrderID_PK, 
        how='inner')

    # i. 
    # Calculate LeadTimeInBusinessDays as the difference between
    # OrderDate and ShipDate, excluding Saturdays and Sundays

    # Function to calculate business days
    def days_diff_only_workdays(start_dt, end_dt):

        # start counter and start date space
        c_day = 0
        current_dt = start_dt

        # add one day until get the end date
        while current_dt <= end_dt:

            # check if day is between monday and friday 
            if current_dt.weekday() < 5:  
                c_day += 1
            current_dt += timedelta(days=1)

        # return the number of counted days that are weekdays
        return c_day
    
    # Register User Function
    days_diff_only_workdays_udf = udf(days_diff_only_workdays, IntegerType())
    
    # create new column with using custom function
    orders_df = orders_df.withColumn(
        "LeadTimeInBusinessDays", 
        days_diff_only_workdays_udf(to_date(col('OrderDate')), to_date(col('ShipDate'))))

    # ii. 
    # Calculate TotalLineExtendedPrice using the formula: 
    # OrderQty * (UnitPrice - UnitPriceDiscount)

    # create new TotalLineExtendedPrice column based on the formula
    orders_df = orders_df.withColumn(
        'TotalLineExtendedPrice', 
        col('OrderQty')*(col('UnitPrice') - col('UnitPriceDiscount')))

    # iii. 
    # Write the results into a table named publish_orders
    
    # create table shape selecting columns
    new_refined_columns = ['LeadTimeInBusinessDays', 'TotalLineExtendedPrice']
    
    # get all columns from detail, some columns from header and refined columns
    orders_df = orders_df.select(
        orderDetail_df.columns +
        [c for c in orderHeader_df.columns if c not in ['SalesOrderId']] +
        new_refined_columns)

    # rename Freight column
    orders_df = orders_df.withColumnRenamed("Freight", "TotalOrderFreight")

    # save refined table
    orders_df.write \
            .mode("overwrite") \
            .parquet("data/publish_orders", compression="none")

In [154]:
refined_salesOrders()

                                                                                

## Analysis Questions

In [169]:
# create sql tmp table for analysis

spark.read.parquet('data/publish_orders').createOrReplaceTempView("orders")
spark.read.parquet('data/publish_product').createOrReplaceTempView("product")

### i. Which color generated the highest revenue each year?

In [170]:
spark.sql("""
    select year(OrderDate),
        Color,
        sum(TotalLineExtendedPrice) sum_TotalLineExtendedPrice_by_year
        from orders A
    join product B
    on A.ProductID_FK = B.ProductID_PK
    group by 1, 2
    order by 3 desc, 1
""").show(10)

+---------------+------+----------------------------------+
|year(OrderDate)| Color|sum_TotalLineExtendedPrice_by_year|
+---------------+------+----------------------------------+
|           2023| Black|              1.5047694369201014E7|
|           2022| Black|              1.4005242975200394E7|
|           2022|   Red|               1.156549116229977E7|
|           2023|Yellow|              1.0638314918100433E7|
|           2023|Silver|                  7613990.73660019|
|           2024|Yellow|                 6368158.478900213|
|           2021|   Red|                 6019614.015699884|
|           2023|  Blue|                 5966277.821900176|
|           2022|Silver|                 5726533.221599957|
|           2024| Black|                 5579326.790800353|
+---------------+------+----------------------------------+
only showing top 10 rows



### ii. What is the average LeadTimeInBusinessDays by ProductCategoryName?

In [172]:
spark.sql("""
    select ProductCategoryName,
        mean(LeadTimeInBusinessDays) avg_LeadTimeInBusinessDays
        from orders A
    join product B
    on A.ProductID_FK = B.ProductID_PK
    where ProductCategoryName is not null
    group by 1
    order by 2 Desc
""").show(10)

+-------------------+--------------------------+
|ProductCategoryName|avg_LeadTimeInBusinessDays|
+-------------------+--------------------------+
|           Clothing|         5.711666367068129|
|        Accessories|         5.702787804316105|
|             Frames|         5.685900314324203|
|              Bikes|         5.667897567632656|
|         Components|         5.655127960275019|
+-------------------+--------------------------+

