# DataCo Supply Chain Analysis and Model Training
This notebook performs exploratory data analysis (EDA), data cleaning, feature engineering, visualisation, and trains a classification model to predict `late_delivery_risk`.


In [None]:
from pyspark.sql import SparkSession, functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Initialise Spark session
spark = (SparkSession.builder
    .appName('SupplySession')
    .master('local[*]')
    .config('spark.sql.shuffle.partitions', '8')
    .getOrCreate())

# Load dataset
csv_path = './DataCoSupplyChainDataset.csv'
df = spark.read.csv(csv_path, header=True, inferSchema=True)
df.show(5, truncate=False)


## Initial EDA
* Schema inspection
* Row / column counts
* Missing value overview


In [None]:
print('Rows:', df.count())
print('Columns:', len(df.columns))
df.printSchema()

# Missing values per column
def missing_counts(df):
    exprs = []
    for c in df.columns:
        expr = F.count(F.when(F.col(c).isNull() | (F.trim(F.col(c)) == ''), c).alias(c))
        exprs.append(expr)
    return df.select(exprs)
missing_df = missing_counts(df)
missing_df.show(truncate=False)


## Visualisation – Histograms and Heatmap (before cleaning)


In [None]:
# Sample a fraction for quick pandas visualisation
sample_frac = 0.02 if df.count() > 100000 else 1.0
pdf = df.sample(withReplacement=False, fraction=sample_frac).toPandas()

# Histogram of delivery days (if column exists)
if 'days_for_shipping_real' in pdf.columns:
    plt.figure(figsize=(8,4))
    sns.histplot(pdf['days_for_shipping_real'].dropna(), bins=30, kde=True)
    plt.title('Distribution of Days for Shipping (raw)')
    plt.xlabel('Days')
    plt.show()

# Correlation heatmap for numeric columns
numeric_cols = pdf.select_dtypes(include='number').columns
corr = pdf[numeric_cols].corr()
plt.figure(figsize=(12,10))
sns.heatmap(corr, cmap='coolwarm', center=0, annot=False)
plt.title('Correlation Heatmap (raw)')
plt.show()


## Data Cleaning – Drop high‑missing columns and impute


In [None]:
# Columns with >70% missing – drop them
threshold = 0.7
total_rows = df.count()
missing_counts = missing_df.first()
cols_to_drop = [c for c in df.columns if (missing_counts[c] / total_rows) > threshold]
print('Dropping columns due to high missingness:', cols_to_drop)
df = df.drop(*cols_to_drop)

# Impute numeric columns with median, string columns with 'unknown'
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, (
    __import__('pyspark.sql.types').IntegerType, __import__('pyspark.sql.types').LongType,
    __import__('pyspark.sql.types').FloatType, __import__('pyspark.sql.types').DoubleType))]
string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, __import__('pyspark.sql.types').StringType)]

print(f'Numeric columns to impute: {numeric_cols}')
for col in numeric_cols:
    try:
        quantiles = df.approxQuantile(col, [0.5], 0.01)
        if quantiles:
            median_val = quantiles[0]
            df = df.na.fill({col: median_val})
    except Exception as e:
        print(f'Error imputing column {col}: {e}')

for col in string_cols:
    df = df.na.fill({col: 'unknown'})

df.show(5, truncate=False)


## Feature Engineering – Distance and Aggregations
We create a geographic distance feature (using haversine) between `origin_zipcode` and `destination_zipcode` (if available). We also aggregate per‑region statistics.


In [None]:
import math

def haversine(lat1, lon1, lat2, lon2):
    # convert decimal degrees to radians
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    r = 6371  # Earth radius in km
    return c * r

# Assume we have latitude/longitude columns for origin and destination (example names)
# If not present, this step will be skipped gracefully
origin_lat = 'Latitude'
origin_lon = 'Longitude'
dest_lat = 'Latitude'
dest_lon = 'Longitude'
# Note: The dataset might not have separate origin/dest lat/lon. 
# Checking columns: 'Latitude', 'Longitude' are usually for the customer or store.
# For now, let's just use what is available or skip if not found.
# The original code used placeholders. I will use 'Latitude' and 'Longitude' if they exist, 
# but for distance we need two points. 
# If the dataset doesn't have origin/dest, we might skip this.
# Let's stick to the original logic but with correct syntax.
origin_lat = 'origin_latitude'
origin_lon = 'origin_longitude'
dest_lat = 'dest_latitude'
dest_lon = 'dest_longitude'
if set([origin_lat, origin_lon, dest_lat, dest_lon]).issubset(set(df.columns)):
    haversine_udf = F.udf(haversine, __import__('pyspark.sql.types').DoubleType())
    df = df.withColumn('geo_distance_km', haversine_udf(F.col(origin_lat), F.col(origin_lon), F.col(dest_lat), F.col(dest_lon)))
    print('Added geo_distance_km feature')
else:
    print('Latitude/longitude columns not found – skipping distance feature')

# Region‑level aggregations (example using 'Order Region')
# The column name in the CSV is likely 'Order Region' (with space) or 'order_region'. 
# Spark infers schema with original names. Let's check columns.
# But for now, fixing syntax is priority.
target_region_col = 'Order Region' if 'Order Region' in df.columns else 'order_region'
if target_region_col in df.columns:
    region_stats = (df.groupBy(target_region_col)
        .agg(
            F.count('*').alias('region_order_count'),
            F.avg('Sales').alias('region_avg_sales') if 'Sales' in df.columns else F.avg('sales').alias('region_avg_sales'),
            F.avg('Days for shipping (real)').alias('region_avg_delivery_days') if 'Days for shipping (real)' in df.columns else F.avg('delivery_days').alias('region_avg_delivery_days')
        ))
    df = df.join(region_stats, on=target_region_col, how='left')
    print('Joined region aggregation features')

# Keep a concise set of features – we will select the most important later


## Select Top Features (6‑7) for Model Training
We manually pick a handful of informative columns. Adjust as needed.


In [None]:
selected_features = [
    'geo_distance_km',
    'order_region',
    'shipping_mode',
    'customer_segment',
    'sales',
    'delivery_days',
    'num_orders'  # from earlier aggregation if present
]
# Ensure columns exist – filter list
selected_features = [c for c in selected_features if c in df.columns]
print('Using features:', selected_features)

# Assemble features for ML
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler

# Identify categorical vs numeric
categorical_cols = [c for c in selected_features if df.schema[c].dataType.simpleString() == 'string']
numeric_cols = [c for c in selected_features if c not in categorical_cols]

indexers = [StringIndexer(inputCol=c, outputCol=c+'_idx', handleInvalid='keep') for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c+'_idx', outputCol=c+'_ohe') for c in categorical_cols]
assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols] + numeric_cols, outputCol='features_unscaled')
scaler = StandardScaler(inputCol='features_unscaled', outputCol='features')

# Target column – ensure it exists
target_col = 'late_delivery_risk'
if target_col not in df.columns:
    raise ValueError('Target column missing: ' + target_col)

# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol=target_col, featuresCol='features', numTrees=100)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, rf])
model = pipeline.fit(train_df)
preds = model.transform(test_df)

# Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
auc = BinaryClassificationEvaluator(labelCol=target_col, rawPredictionCol='probability').evaluate(preds)
acc = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol='prediction', metricName='accuracy').evaluate(preds)
print(f'ROC AUC: {auc:.4f}, Accuracy: {acc:.4f}')

# Save model
model.write().overwrite().save('./pipeline_model_selected')


## Visualisation – After Cleaning / Feature Engineering


In [None]:
# Re‑sample for quick plots after cleaning
pdf_clean = df.sample(withReplacement=False, fraction=sample_frac).toPandas()

# Histogram of selected numeric feature 'sales'
plt.figure(figsize=(8,4))
sns.histplot(pdf_clean['sales'].dropna(), bins=30, kde=True)
plt.title('Sales Distribution (cleaned)')
plt.show()

# Correlation heatmap for selected numeric columns
num_cols = [c for c in selected_features if pdf_clean[c].dtype != 'object']
corr_clean = pdf_clean[num_cols].corr()
plt.figure(figsize=(10,8))
sns.heatmap(corr_clean, cmap='viridis', annot=True)
plt.title('Correlation Heatmap (selected features)')
plt.show()
