# Part 1: Data Pipeline & Historical Analysis
**Project:** ForgeFlow Manufacturing Analytics
**Author:** Jaylen Hester

### üéØ Objective
To engineer an end-to-end data pipeline that simulates a manufacturing environment, integrates it with real-world quality control and sales data, and performs root-cause analysis on historical defects.

### üèóÔ∏è Architecture
1.  **Supply Side (Simulation):** Custom Python package (`forgeflow`) generates 365 days of production logs.
2.  **Telemetry Layer (Ingestion):** Integration of UCI SECOM sensor data to simulate machine states.
3.  **Demand Side (Real World):** Integration of Olist E-Commerce data to track downstream customer sentiment.
4.  **Analytics Engine:** In-Memory SQLite warehouse for multi-source SQL joins.

In [None]:

# 1. SETUP
%load_ext autoreload
%autoreload 2

import sys
import pandas as pd
import numpy as np
import sqlite3
from pathlib import Path

# Add local src directory to path for package import
sys.path.append("../src")

try:
    import forgeflow
    from forgeflow import synth, clean, features, io, paths
    print(f"Package loaded: {forgeflow.__file__}")
except ImportError as e:
    print(f"Error loading forgeflow package: {e}")

# Mapping dictionary to link synthetic products to Olist SKUs
PRODUCT_MAPPING = {
    "microwave": "1e9e8ef04dbcff4541ed26657ea517e5",
    "vacuum": "3aa071139cb16b67ca9e5dea641aaa2f",
    "coffee_maker": "a62b9723af96d72995a548a67bb184e5"
}




The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Package loaded: /workspaces/forgeflow-manufacturing-analytics/src/forgeflow/__init__.py


## 1. Supply Chain Simulation
**Goal:** Generate a synthetic "Ground Truth" for manufacturing operations.
We utilize the internal `synth` module to create production batches across three distinct plant locations, standardizing the schema for downstream integration.

In [34]:
# %% [markdown]
# ## 1. Synthetic Data Generation
# Simulates 365 days of production logs across 3 distinct plants using the internal `synth` module.

# %%
print("Generating production logs...")

# Configure and run simulation
cfg = synth.SynthConfig(n_days=365, seed=42)
df_production = synth.make_production(cfg)

# Standardize column names
df_production = clean.standardize_columns(df_production)

# Generate unique Batch IDs for tracking
df_production["batch_id"] = [f"BATCH-{i:05d}" for i in range(len(df_production))]

# Map internal product names to external Olist IDs
df_production["olist_product_id"] = df_production["product"].map(PRODUCT_MAPPING)

print(f"Generated {len(df_production)} records.")
df_production.head()


Generating production logs...
Generated 3285 records.


Unnamed: 0,date,plant,product,units,defect_rate,batch_id,olist_product_id
0,2024-11-25,campinas,microwave,212,0.033752,BATCH-00000,1e9e8ef04dbcff4541ed26657ea517e5
1,2024-11-25,campinas,vacuum,157,0.013489,BATCH-00001,3aa071139cb16b67ca9e5dea641aaa2f
2,2024-11-25,campinas,coffee_maker,191,0.024916,BATCH-00002,a62b9723af96d72995a548a67bb184e5
3,2024-11-25,manaus,microwave,198,0.033889,BATCH-00003,1e9e8ef04dbcff4541ed26657ea517e5
4,2024-11-25,manaus,vacuum,155,0.022338,BATCH-00004,3aa071139cb16b67ca9e5dea641aaa2f


## 2. Data Ingestion & Integration
**Objective:** Ingest real-world datasets and merge them with synthetic production logs.

**Data Sources:**
* **Olist (E-Commerce):** Orders, Items, and Reviews (CSV).
* **SECOM (Sensors):** UCI Semiconductor Manufacturing dataset (CSV).

**Transformations:**
* **Column Selection:** Extracts specific sensor columns (Pressure, Temp, Vibration, Amperage) from SECOM, explicitly discarding the raw timestamp (Column 0).
* **Telemetry Mapping:** Randomly maps sensor readings to specific production batches to simulate incomplete telemetry coverage (Left Join).

In [49]:
# %%
# 1. LOAD EXTERNAL DATA
print("Loading external datasets...")

# FIX: Initialize variables to None first so the Linter is happy
orders = None
items = None
reviews = None
secom_clean = None 

try:
    # Load Demand Data (Olist)
    orders = io.read_raw_csv('olist/olist_orders_dataset.csv')
    items = io.read_raw_csv('olist/olist_order_items_dataset.csv')
    reviews = io.read_raw_csv('olist/olist_order_reviews_dataset.csv')
    
    # Load Sensor Data (SECOM)
    # NOTE: Column 0 is a timestamp (Year 2008). We exclude it using iloc[:, 1:5]
    # to capture only the relevant sensor readings (cols 1-4).
    secom = io.read_raw_csv('secom/uci-secom.csv')
    secom_clean = secom.iloc[:, 1:5].copy()
    secom_clean.columns = ['sensor_pressure', 'sensor_temp', 'sensor_vibration', 'sensor_amp']
    
    print("External data loaded successfully.")
except Exception as e:
    print(f"Error loading external data: {e}")

# 2. MERGE SENSORS WITH PRODUCTION
# We verify data loaded correctly before running the merge logic
if secom_clean is not None and df_production is not None:
    print("Linking sensor telemetry to production batches...")

    # Set seed to ensure reproducible mapping for modeling
    np.random.seed(42)

    # Randomly assign sensor rows to existing Batch IDs
    assigned_batches = np.random.choice(df_production['batch_id'], size=len(secom_clean))
    secom_clean['batch_id'] = assigned_batches

    # Master Merge: Production Logs (Left) + Sensor Data (Right)
    # Result: All production batches are kept; those without sensor data will have NaNs.
    df_master = df_production.merge(secom_clean, on="batch_id", how="left")

    # Feature Engineering: Create binary target for Defect Classification
    # Logic: A Defect Rate > 2.5% is classified as a "Failure" (1).
    df_master['is_defective'] = (df_master['defect_rate'] > 0.025).astype(int)

    print(f"Master Integration Complete. Final Shape: {df_master.shape}")
    display(df_master.head())
else:
    print("‚ùå Critical Error: Data Ingestion failed. Skipping Merge step.")

Loading external datasets...


External data loaded successfully.
Linking sensor telemetry to production batches...
Master Integration Complete. Final Shape: (3615, 12)


Unnamed: 0,date,plant,product,units,defect_rate,batch_id,olist_product_id,sensor_pressure,sensor_temp,sensor_vibration,sensor_amp,is_defective
0,2024-11-25,campinas,microwave,212,0.033752,BATCH-00000,1e9e8ef04dbcff4541ed26657ea517e5,,,,,1
1,2024-11-25,campinas,vacuum,157,0.013489,BATCH-00001,3aa071139cb16b67ca9e5dea641aaa2f,2992.52,2470.14,2197.6444,1247.0334,0
2,2024-11-25,campinas,coffee_maker,191,0.024916,BATCH-00002,a62b9723af96d72995a548a67bb184e5,,,,,0
3,2024-11-25,manaus,microwave,198,0.033889,BATCH-00003,1e9e8ef04dbcff4541ed26657ea517e5,3043.77,2588.52,2219.7667,2086.471,1
4,2024-11-25,manaus,vacuum,155,0.022338,BATCH-00004,3aa071139cb16b67ca9e5dea641aaa2f,3036.93,2570.13,2230.7555,1281.7862,0


## 3. Business Intelligence Analysis
**Goal:** Execute Root Cause Analysis using SQL.
We instantiate an in-memory Data Warehouse to join **Factory Logs** (Supply), **Sensor Telemetry** (Process), and **Customer Reviews** (Demand).

**Key Business Question:**
> *"How do upstream factory anomalies (e.g., High Pressure) impact downstream customer satisfaction?"*

In [50]:
# %% [markdown]
# ## 3. SQL Analysis
# Aggregates key performance metrics (pressure, vibration, defect rate, review scores) using an in-memory SQLite database.

# %%
# Initialize database connection
conn = sqlite3.connect(':memory:')

# FIX: We only run SQL logic if all necessary DataFrames exist.
# This satisfies the Linter ("reportOptionalMemberAccess").
if (df_production is not None and 
    secom_clean is not None and 
    items is not None and 
    reviews is not None):

    print("Building Data Warehouse...")

    # Load tables into warehouse
    df_production.to_sql('fact_production', conn, index=False, if_exists='replace')
    secom_clean.to_sql('fact_sensors', conn, index=False, if_exists='replace')
    items.to_sql('dim_items', conn, index=False, if_exists='replace')
    reviews.to_sql('dim_reviews', conn, index=False, if_exists='replace')

    # Query: The Full "Magnum Opus" Analysis
    query = """
    SELECT 
        -- 1. Segments
        p.plant,
        p.product,
        
        -- 2. Factory Telemetry (The "Why")
        COUNT(s.batch_id) as batches_with_sensor_logs,
        ROUND(AVG(s.sensor_pressure), 2) as avg_pressure,
        ROUND(AVG(s.sensor_vibration), 2) as avg_vibration,
        
        -- 3. Production Quality (The "What")
        ROUND(AVG(p.defect_rate) * 100, 2) || '%' as defect_pct,
        
        -- 4. Customer Impact (The "So What")
        ROUND(AVG(r.review_score), 2) as avg_stars,
        COUNT(r.review_id) as review_vol

    FROM fact_production p

    -- Join Sensor Data (Left Join to keep batches even if sensors missed them)
    LEFT JOIN fact_sensors s ON p.batch_id = s.batch_id

    -- Join Olist Data (Bridge to Real World)
    JOIN dim_items i ON p.olist_product_id = i.product_id
    JOIN dim_reviews r ON i.order_id = r.order_id

    GROUP BY 1, 2
    ORDER BY avg_pressure DESC
    """

    df_insight = pd.read_sql(query, conn)
    display(df_insight)

else:
    print("‚ö†Ô∏è Skipping SQL Analysis: One or more upstream datasets failed to load.")

Building Data Warehouse...


Unnamed: 0,plant,product,batches_with_sensor_logs,avg_pressure,avg_vibration,defect_pct,avg_stars,review_vol
0,recife,vacuum,166,3024.57,2199.55,2.04%,5.0,406
1,campinas,microwave,173,3019.06,2196.97,2.97%,5.0,395
2,manaus,microwave,175,3016.52,2200.62,3.02%,5.0,398
3,campinas,vacuum,182,3013.09,2197.29,2.0%,5.0,400
4,manaus,vacuum,171,3011.84,2202.92,2.0%,5.0,393
5,recife,microwave,164,3008.78,2204.04,3.0%,5.0,392


## 4. Pipeline Export
**Goal:** Serialize processed datasets for the next stages of the project.
* **`forgeflow_sql_summary.csv`**: Aggregated KPIs for the Tableau Dashboard (Part 3).
* **`forgeflow_modeling_data.csv`**: Granular feature matrix for Machine Learning (Part 2).

In [None]:

print("Exporting processed files...")

# 1. Save SQL Summary (For Tableau Dashboard)
# This keeps the "Big Picture" view
io.write_processed_csv(df_insight, "forgeflow_sql_summary.csv")

# 2. Save Modeling Data (For Machine Learning)
# CRITICAL STEP: We drop rows where sensors are missing.
# We can't train a "Sensor Model" on empty data.
df_model_ready = df_master.dropna(subset=['sensor_pressure'])

print(f"Original Batches: {len(df_master)}")
print(f"Batches with Sensor Data (Model Ready): {len(df_model_ready)}")

io.write_processed_csv(df_model_ready, "forgeflow_modeling_data.csv")

print("Export complete. Ready for Notebook 02_Modeling.")

Exporting processed files...
Original Batches: 3615
Batches with Sensor Data (Model Ready): 1561
Export complete. Ready for Notebook 02_Modeling.
