# Data Transformation Process

This notebook transforms data from the Bronze layer into the Silver layer. It represents the second major step in our ETL pipeline, after data extraction and quality checks.

## Process Overview
1. Set up environment and initialize Spark session
2. Define paths and configure logging
3. Read data from Bronze layer
4. Clean and transform the data
5. Add derived features and metrics
6. Save transformed data to Silver layer

## 1. Environment Setup

Initialize Spark session and import required libraries.

In [43]:
# Set JAVA_HOME environment variable
import os
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jre1.8.0_451'

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, DateType
from datetime import datetime
import logging

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Transformation") \
    .getOrCreate()

## 2. Path Configuration and Logging Setup

Define paths for bronze and silver layers, and set up logging.

In [44]:
# Define paths - use absolute paths to avoid confusion
current_dir = os.path.dirname(os.path.abspath("__file__"))
base_dir = os.path.dirname(current_dir)  # Go up one level to reach the root directory

# Get the latest date from bronze layer
bronze_base_path = os.path.join(base_dir, "output", "bronzeLayer")
silver_base_path = os.path.join(base_dir, "output", "silverLayer")

# Use current date for silver layer
date_str = datetime.now().strftime("%Y-%m-%d")

# Log path organized by date
log_dir = os.path.join(base_dir, "logs", "data_transformation", date_str)
log_path = os.path.join(log_dir, "data_transformation.log")

# Ensure the logs directory exists
os.makedirs(log_dir, exist_ok=True)

# Configure logging to write to the log file
logging.basicConfig(filename=log_path, level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

def log_message(message, level="info"):
    """Logs messages with the specified level."""
    if level == "info":
        logging.info(message)
    elif level == "error":
        logging.error(message)
    print(message)

# Find the latest date folder in bronze layer
import glob
bronze_date_folders = glob.glob(os.path.join(bronze_base_path, "*"))
if not bronze_date_folders:
    raise Exception(f"No date folders found in {bronze_base_path}")

latest_bronze_folder = max(bronze_date_folders)
bronze_path = latest_bronze_folder
silver_path = os.path.join(silver_base_path, date_str)

print(f"Bronze path: {bronze_path}")
print(f"Silver path: {silver_path}")

Bronze path: c:\Users\oussema\OneDrive\Bureau\FSEGN\DW-Odoo\output\bronzeLayer\2025-05-08
Silver path: c:\Users\oussema\OneDrive\Bureau\FSEGN\DW-Odoo\output\silverLayer\2025-05-08


## 3. Data Loading

Load data from the Bronze layer and examine its structure.

In [45]:
# Read data from bronze layer
df = spark.read.parquet(bronze_path)

# Display initial data stats
print(f"Initial row count: {df.count()}")
print(f"Initial columns: {df.columns}")

# Display sample data
display(df.limit(5).toPandas())

Initial row count: 1734
Initial columns: ['Component Status', 'Product', 'Reference', 'Source', 'Responsible', 'Start', 'End', 'Deadline', 'State', 'Quantity Producing', 'Quantity To Produce', 'Total Quantity', 'Product/Cost', 'Product/Sales Price']


Unnamed: 0,Component Status,Product,Reference,Source,Responsible,Start,End,Deadline,State,Quantity Producing,Quantity To Produce,Total Quantity,Product/Cost,Product/Sales Price
0,,[FURN_8855] Drawer,WH/MO/01679,,OdooBot,2024-01-03 01:00:00,2024-02-19 01:00:00,2024-01-25 01:00:00,Cancelled,12.0,17.0,17.0,100.0,110.5
1,,[FURN_8522] Table Top,WH/MO/07299,SO-8299,Default User Template,2024-01-03 01:00:00,2024-02-27 01:00:00,2024-01-22 01:00:00,Cancelled,4.0,13.0,13.0,240.0,380.0
2,Available,[FURN_9666] Table,WH/MO/02264,,Public user,2024-01-06 01:00:00,2024-02-06 01:00:00,2024-03-29 01:00:00,,19.0,19.0,19.0,290.0,520.0
3,Available,[FURN_7023] Wood Panel,WH/MO/03734,,Mitchell Admin,2024-01-06 01:00:00,2024-02-17 01:00:00,2024-03-20 01:00:00,,10.0,10.0,10.0,80.0,100.0
4,Available,[FURN_7023] Wood Panel,WH/MO/03614,SO-4614,Mitchell Admin,2024-01-07 01:00:00,2024-02-03 01:00:00,2024-03-11 01:00:00,,15.0,15.0,15.0,80.0,100.0


## 4. Data Cleaning

### 4.1 Remove Unused Columns

Remove columns that are not needed for analysis.

In [46]:
#remove unused columns
df = df.drop("Component Status", "Source")

### 4.2 Data Validation

Check for missing values and validate data quality.

In [47]:
# count Responsible with null values and print the result
print(f"Number of rows with null Responsible: {df.filter(F.col('Responsible').isNull()).count()}")
# check responsible values
df.select("Responsible").distinct().show()

Number of rows with null Responsible: 0
+--------------------+
|         Responsible|
+--------------------+
|Default User Temp...|
|         Public user|
|             OdooBot|
|      Mitchell Admin|
|Portal User Template|
+--------------------+



In [48]:
# Change values in responsible column
df = df.withColumn("Responsible", 
                  F.when(F.col("Responsible") == "OdooBot", "Department 1")
                   .when(F.col("Responsible") == "Mitchell Admin", "Department 2")
                   .when(F.col("Responsible") == "Default User Template", "Department 3")
                   .when(F.col("Responsible") == "Public user", "Department 4")
                   .when(F.col("Responsible") == "Portal User Template", "Department 5")
                   .otherwise(F.col("Responsible"))  # Keep original value for other cases
                  )
# Display sample data
display(df.limit(5).toPandas())

Unnamed: 0,Product,Reference,Responsible,Start,End,Deadline,State,Quantity Producing,Quantity To Produce,Total Quantity,Product/Cost,Product/Sales Price
0,[FURN_8855] Drawer,WH/MO/01679,Department 1,2024-01-03 01:00:00,2024-02-19 01:00:00,2024-01-25 01:00:00,Cancelled,12.0,17.0,17.0,100.0,110.5
1,[FURN_8522] Table Top,WH/MO/07299,Department 3,2024-01-03 01:00:00,2024-02-27 01:00:00,2024-01-22 01:00:00,Cancelled,4.0,13.0,13.0,240.0,380.0
2,[FURN_9666] Table,WH/MO/02264,Department 4,2024-01-06 01:00:00,2024-02-06 01:00:00,2024-03-29 01:00:00,,19.0,19.0,19.0,290.0,520.0
3,[FURN_7023] Wood Panel,WH/MO/03734,Department 2,2024-01-06 01:00:00,2024-02-17 01:00:00,2024-03-20 01:00:00,,10.0,10.0,10.0,80.0,100.0
4,[FURN_7023] Wood Panel,WH/MO/03614,Department 2,2024-01-07 01:00:00,2024-02-03 01:00:00,2024-03-11 01:00:00,,15.0,15.0,15.0,80.0,100.0


In [49]:
# Handle missing values
df_cleaned = df.na.fill({
    "State": "Unknown",
    "Quantity Producing": 0,
    "Quantity To Produce": 0,
    "Total Quantity": 0,
    "Product/Cost": 0,
    "Product/Sales Price": 0
})

# Verify the missing values have been filled
for column in df_cleaned.columns:
    null_count = df_cleaned.filter(F.col(column).isNull()).count()
    if null_count > 0:
        print(f"{column}: {null_count} null values remaining")
    
# Display the cleaned data
print(f"Cleaned row count: {df_cleaned.count()}")
display(df_cleaned.limit(5).toPandas())

Cleaned row count: 1734


Unnamed: 0,Product,Reference,Responsible,Start,End,Deadline,State,Quantity Producing,Quantity To Produce,Total Quantity,Product/Cost,Product/Sales Price
0,[FURN_8855] Drawer,WH/MO/01679,Department 1,2024-01-03 01:00:00,2024-02-19 01:00:00,2024-01-25 01:00:00,Cancelled,12.0,17.0,17.0,100.0,110.5
1,[FURN_8522] Table Top,WH/MO/07299,Department 3,2024-01-03 01:00:00,2024-02-27 01:00:00,2024-01-22 01:00:00,Cancelled,4.0,13.0,13.0,240.0,380.0
2,[FURN_9666] Table,WH/MO/02264,Department 4,2024-01-06 01:00:00,2024-02-06 01:00:00,2024-03-29 01:00:00,Unknown,19.0,19.0,19.0,290.0,520.0
3,[FURN_7023] Wood Panel,WH/MO/03734,Department 2,2024-01-06 01:00:00,2024-02-17 01:00:00,2024-03-20 01:00:00,Unknown,10.0,10.0,10.0,80.0,100.0
4,[FURN_7023] Wood Panel,WH/MO/03614,Department 2,2024-01-07 01:00:00,2024-02-03 01:00:00,2024-03-11 01:00:00,Unknown,15.0,15.0,15.0,80.0,100.0


### 4.3 Standardize Column Names

Rename columns with spaces and special characters to use underscores for easier handling.

In [50]:
# Rename columns with spaces to use underscores for easier handling
try:
    # Create a list of column renames
    column_renames = [
        ("Total Quantity", "Total_Quantity"),
        ("Quantity Producing", "Quantity_Producing"),
        ("Quantity To Produce", "Quantity_To_Produce"),
        ("Product/Cost", "Product_Cost"),
        ("Product/Sales Price", "Product_Sales_Price")
    ]
    
    # Apply the renames
    df_standardized = df_cleaned
    for old_name, new_name in column_renames:
        if old_name in df_standardized.columns:
            df_standardized = df_standardized.withColumnRenamed(old_name, new_name)
    
    # Verify the column names after renaming
    print("Column names after standardization:")
    for col_name in df_standardized.columns:
        print(f"- {col_name}")
    
    # Display sample data after renaming
    display(df_standardized.select("Reference", "Total_Quantity", "Product_Cost", "Product_Sales_Price").limit(5).toPandas())
    
    # Use the standardized dataframe for further processing
    df_cleaned = df_standardized
except Exception as e:
    log_message(f"Error standardizing column names: {str(e)}", level="error")
    import traceback
    log_message(traceback.format_exc(), level="error")
    # Continue with original column names if there's an error
    pass
    

Column names after standardization:
- Product
- Reference
- Responsible
- Start
- End
- Deadline
- State
- Quantity_Producing
- Quantity_To_Produce
- Total_Quantity
- Product_Cost
- Product_Sales_Price


Unnamed: 0,Reference,Total_Quantity,Product_Cost,Product_Sales_Price
0,WH/MO/01679,17.0,100.0,110.5
1,WH/MO/07299,13.0,240.0,380.0
2,WH/MO/02264,19.0,290.0,520.0
3,WH/MO/03734,10.0,80.0,100.0
4,WH/MO/03614,15.0,80.0,100.0


## 5. Feature Engineering

Extract additional features from the data and calculate derived metrics.

In [51]:
# Extract product code and name from Product column
# Format is typically [CODE] Name
df_transformed = df_cleaned.withColumn(
    "Product_Code", 
    F.regexp_extract(F.col("Product"), "\\[(.*?)\\]", 1)
)

df_transformed = df_transformed.withColumn(
    "Product_Name", 
    F.regexp_replace(F.col("Product"), "\\[.*?\\]\\s*", "")
)

# Calculate production efficiency (Quantity_Producing / Quantity_To_Produce)
# Use standardized column names if available, otherwise use original names
quantity_producing_col = "Quantity_Producing" if "Quantity_Producing" in df_transformed.columns else "Quantity Producing"
quantity_to_produce_col = "Quantity_To_Produce" if "Quantity_To_Produce" in df_transformed.columns else "Quantity To Produce"

df_transformed = df_transformed.withColumn(
    "Production_Efficiency",
    F.when(F.col(quantity_to_produce_col) > 0, 
           F.round(F.col(quantity_producing_col) / F.col(quantity_to_produce_col) * 100, 2))
    .otherwise(0)
)

# Calculate profit margin ((Sales Price - Cost) / Cost) * 100
# Use standardized column names if available, otherwise use original names
product_cost_col = "Product_Cost" if "Product_Cost" in df_transformed.columns else "Product/Cost"
product_sales_price_col = "Product_Sales_Price" if "Product_Sales_Price" in df_transformed.columns else "Product/Sales Price"

df_transformed = df_transformed.withColumn(
    "Profit_Margin_Percent",
    F.when(F.col(product_cost_col) > 0, 
           F.round(((F.col(product_sales_price_col) - F.col(product_cost_col)) / F.col(product_cost_col)) * 100, 2))
    .otherwise(0)
)

# Display the transformed data
display(df_transformed.select("Product", "Product_Code", "Product_Name", "Production_Efficiency", "Profit_Margin_Percent").limit(5).toPandas())

Unnamed: 0,Product,Product_Code,Product_Name,Production_Efficiency,Profit_Margin_Percent
0,[FURN_8855] Drawer,FURN_8855,Drawer,70.59,10.5
1,[FURN_8522] Table Top,FURN_8522,Table Top,30.77,58.33
2,[FURN_9666] Table,FURN_9666,Table,100.0,79.31
3,[FURN_7023] Wood Panel,FURN_7023,Wood Panel,100.0,25.0
4,[FURN_7023] Wood Panel,FURN_7023,Wood Panel,100.0,25.0


## 6. Time Dimensions

Extract time-related features from date fields for time-based analysis.

In [52]:
# Add time dimensions for better analysis

# Add year, month, day, weekday for Start date
df_transformed = df_transformed.withColumn("Start_Year", F.year(F.col("Start")))
df_transformed = df_transformed.withColumn("Start_Month", F.month(F.col("Start")))
df_transformed = df_transformed.withColumn("Start_Day", F.dayofmonth(F.col("Start")))
df_transformed = df_transformed.withColumn("Start_Weekday", F.dayofweek(F.col("Start")))

# Add quarter and month name for better reporting
df_transformed = df_transformed.withColumn("Start_Quarter", F.quarter(F.col("Start")))
df_transformed = df_transformed.withColumn("Start_Month_Name", 
                                          F.date_format(F.col("Start"), "MMMM"))

# Calculate production duration in days
df_transformed = df_transformed.withColumn(
    "Production_Duration_Days",
    F.when(F.col("End").isNotNull() & F.col("Start").isNotNull(),
           F.datediff(F.col("End"), F.col("Start")))
    .otherwise(0)
)

# Display the time dimensions
display(df_transformed.select(
    "Reference", "Start", "End", "Start_Year", "Start_Quarter", "Start_Month_Name", 
    "Start_Weekday", "Production_Duration_Days"
).limit(5).toPandas())

Unnamed: 0,Reference,Start,End,Start_Year,Start_Quarter,Start_Month_Name,Start_Weekday,Production_Duration_Days
0,WH/MO/01679,2024-01-03 01:00:00,2024-02-19 01:00:00,2024,1,January,4,47
1,WH/MO/07299,2024-01-03 01:00:00,2024-02-27 01:00:00,2024,1,January,4,55
2,WH/MO/02264,2024-01-06 01:00:00,2024-02-06 01:00:00,2024,1,January,7,31
3,WH/MO/03734,2024-01-06 01:00:00,2024-02-17 01:00:00,2024,1,January,7,42
4,WH/MO/03614,2024-01-07 01:00:00,2024-02-03 01:00:00,2024,1,January,1,27


## 7. Product Categorization

Categorize products based on name, price range, and profit margin.

In [53]:
# Categorize products based on product name
df_transformed = df_transformed.withColumn(
    "Product_Category",
    F.when(F.lower(F.col("Product_Name")).contains("table"), "Furniture - Tables")
     .when(F.lower(F.col("Product_Name")).contains("drawer"), "Furniture - Storage")
     .when(F.lower(F.col("Product_Name")).contains("panel"), "Furniture - Components")
     .when(F.lower(F.col("Product_Name")).contains("chair"), "Furniture - Seating")
     .when(F.lower(F.col("Product_Name")).contains("desk"), "Furniture - Office")
     .when(F.lower(F.col("Product_Name")).contains("shelf"), "Furniture - Storage")
     .when(F.lower(F.col("Product_Name")).contains("cabinet"), "Furniture - Storage")
     .otherwise("Other")
)

# Determine which column name to use (standardized or original)
product_sales_price_col = "Product_Sales_Price" if "Product_Sales_Price" in df_transformed.columns else "Product/Sales Price"

# Categorize products based on price range
df_transformed = df_transformed.withColumn(
    "Price_Category",
    F.when(F.col(product_sales_price_col) < 100, "Budget")
     .when((F.col(product_sales_price_col) >= 100) & (F.col(product_sales_price_col) < 250), "Standard")
     .when((F.col(product_sales_price_col) >= 250) & (F.col(product_sales_price_col) < 500), "Premium")
     .otherwise("Luxury")
)

# Categorize products based on profit margin
df_transformed = df_transformed.withColumn(
    "Margin_Category",
    F.when(F.col("Profit_Margin_Percent") < 20, "Low Margin")
     .when((F.col("Profit_Margin_Percent") >= 20) & (F.col("Profit_Margin_Percent") < 50), "Medium Margin")
     .when((F.col("Profit_Margin_Percent") >= 50) & (F.col("Profit_Margin_Percent") < 100), "High Margin")
     .otherwise("Very High Margin")
)

# Display the categorized data
display(df_transformed.select(
    "Product_Name", "Product_Category", product_sales_price_col, "Price_Category", 
    "Profit_Margin_Percent", "Margin_Category"
).limit(10).toPandas())

Unnamed: 0,Product_Name,Product_Category,Product_Sales_Price,Price_Category,Profit_Margin_Percent,Margin_Category
0,Drawer,Furniture - Storage,110.5,Standard,10.5,Low Margin
1,Table Top,Furniture - Tables,380.0,Premium,58.33,High Margin
2,Table,Furniture - Tables,520.0,Luxury,79.31,High Margin
3,Wood Panel,Furniture - Components,100.0,Standard,25.0,Medium Margin
4,Wood Panel,Furniture - Components,100.0,Standard,25.0,Medium Margin
5,Drawer,Furniture - Storage,110.5,Standard,10.5,Low Margin
6,Wood Panel,Furniture - Components,100.0,Standard,25.0,Medium Margin
7,Desk Combination,Furniture - Office,450.0,Premium,50.0,High Margin
8,Table Top,Furniture - Tables,380.0,Premium,58.33,High Margin
9,Wood Panel,Furniture - Components,100.0,Standard,25.0,Medium Margin


## 8. Data Analysis

Perform exploratory analysis on the transformed data to extract insights.

In [54]:
# Perform some analysis on the transformed data

# Count products by category
print("Products by Category:")
df_transformed.groupBy("Product_Category").count().orderBy(F.desc("count")).show()

# Average profit margin by product category
print("\nAverage Profit Margin by Product Category:")
df_transformed.groupBy("Product_Category") \
    .agg(F.round(F.avg("Profit_Margin_Percent"), 2).alias("Avg_Profit_Margin_Percent")) \
    .orderBy(F.desc("Avg_Profit_Margin_Percent")) \
    .show()

# Production efficiency by responsible department
print("\nProduction Efficiency by Department:")
df_transformed.groupBy("Responsible") \
    .agg(F.round(F.avg("Production_Efficiency"), 2).alias("Avg_Production_Efficiency")) \
    .orderBy(F.desc("Avg_Production_Efficiency")) \
    .show()

# Average production duration by product category
print("\nAverage Production Duration by Product Category:")
df_transformed.groupBy("Product_Category") \
    .agg(F.round(F.avg("Production_Duration_Days"), 1).alias("Avg_Production_Days")) \
    .orderBy(F.desc("Avg_Production_Days")) \
    .show()

# Monthly production count
print("\nMonthly Production Count:")
df_transformed.groupBy("Start_Year", "Start_Month", "Start_Month_Name") \
    .count() \
    .orderBy("Start_Year", "Start_Month") \
    .show()

Products by Category:
+--------------------+-----+
|    Product_Category|count|
+--------------------+-----+
|  Furniture - Tables|  672|
|Furniture - Compo...|  386|
|  Furniture - Office|  368|
| Furniture - Storage|  308|
+--------------------+-----+


Average Profit Margin by Product Category:
+--------------------+-------------------------+
|    Product_Category|Avg_Profit_Margin_Percent|
+--------------------+-------------------------+
|  Furniture - Tables|                    69.29|
|  Furniture - Office|                     50.0|
|Furniture - Compo...|                     25.0|
| Furniture - Storage|                     10.5|
+--------------------+-------------------------+


Production Efficiency by Department:
+------------+-------------------------+
| Responsible|Avg_Production_Efficiency|
+------------+-------------------------+
|Department 2|                    95.45|
|Department 1|                    95.37|
|Department 5|                    95.28|
|Department 3|          

## 9. Save Transformed Data

Save the transformed data to the Silver layer in both Parquet and CSV formats.

In [55]:
# Ensure the silver layer directory exists
os.makedirs(silver_path, exist_ok=True)

# Save the transformed data to the silver layer
try:
    # Save as parquet format
    df_transformed.write.mode("overwrite").parquet(os.path.join(silver_path, "mrp_production.parquet"))
    log_message(f"Transformed data saved to {os.path.join(silver_path, 'mrp_production.parquet')}")
    
    # Also save as CSV for easier viewing if needed
    df_transformed.toPandas().to_csv(os.path.join(silver_path, "mrp_production.csv"), index=False)
    log_message(f"Transformed data also saved as CSV to {os.path.join(silver_path, 'mrp_production.csv')}")
    
    # Print summary of the transformation
    print(f"\nTransformation Summary:")
    print(f"Input rows: {df.count()}")
    print(f"Output rows: {df_transformed.count()}")
    print(f"Input columns: {len(df.columns)}")
    print(f"Output columns: {len(df_transformed.columns)}")
    print(f"New columns added: {len(df_transformed.columns) - len(df.columns)}")
    print(f"\nTransformation completed successfully!")
    
except Exception as e:
    log_message(f"Error saving transformed data: {str(e)}", level="error")
    import traceback
    log_message(traceback.format_exc(), level="error")

Transformed data saved to c:\Users\oussema\OneDrive\Bureau\FSEGN\DW-Odoo\output\silverLayer\2025-05-08\mrp_production.parquet
Transformed data also saved as CSV to c:\Users\oussema\OneDrive\Bureau\FSEGN\DW-Odoo\output\silverLayer\2025-05-08\mrp_production.csv

Transformation Summary:
Input rows: 1734
Output rows: 1734
Input columns: 12
Output columns: 26
New columns added: 14

Transformation completed successfully!


## 10. Cleanup

Stop the Spark session and release resources.

In [56]:
# Safely stop Spark session
try:
    # Check if spark session is active before stopping
    spark.sparkContext.getConf().getAll()
    spark.stop()
    print("Spark session stopped.")
except:
    print("No active Spark session to stop.")

Spark session stopped.
