# Loading data

In [1]:
from sparksession import spark

df = spark.read.csv('../data/dataset/DataCoSupplyChainDataset.csv',header=True, inferSchema=True)

df.printSchema()


root
 |-- Type: string (nullable = true)
 |-- Days for shipping (real): integer (nullable = true)
 |-- Days for shipment (scheduled): integer (nullable = true)
 |-- Benefit per order: double (nullable = true)
 |-- Sales per customer: double (nullable = true)
 |-- Delivery Status: string (nullable = true)
 |-- Late_delivery_risk: integer (nullable = true)
 |-- Category Id: integer (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Customer City: string (nullable = true)
 |-- Customer Country: string (nullable = true)
 |-- Customer Email: string (nullable = true)
 |-- Customer Fname: string (nullable = true)
 |-- Customer Id: integer (nullable = true)
 |-- Customer Lname: string (nullable = true)
 |-- Customer Password: string (nullable = true)
 |-- Customer Segment: string (nullable = true)
 |-- Customer State: string (nullable = true)
 |-- Customer Street: string (nullable = true)
 |-- Customer Zipcode: integer (nullable = true)
 |-- Department Id: integer (nullable = 

In [2]:
df.show()

In [3]:

from pyspark.sql import functions as f
df_cleaned = df.filter(f.col('Order Status') != 'CANCELED')
df_cleaned.filter(f.col('Order Status') =='CANCELED' ).count()

df_cleaned.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# Product Description,Order Zipcode have null values
# df_cleaned = df.drop('Order Zipcode', 'Product Description', 'Customer Email', 'Customer Password' )

df_cleaned = df_cleaned.withColumn(
    'Shipping Mode',
    f.when(f.col('Shipping Mode') == 'Same Day', 'First Class')
    .otherwise(f.col('Shipping Mode'))
)



In [4]:
df_cleaned.show()



In [5]:
columns_redondantes  = [
    # 'Type',
    'Days for shipping (real)',
    'Delivery Status',
    'Customer Fname',
    'Customer Lname',
    'Customer Email',
    'Customer Password',
    'Order Id',
    'Customer Id',
    'Order Item Id',
    'Order Customer Id',
    'Order Item Cardprod Id',
    'Product Card Id',
    'Product Category Id',
    'Department Id',
    'Product Description',
    'Product Image',
    'Product Name'  ,
    'Order Item Total',
    'Order Profit Per Order',
    'Benefit per order',
    'Sales per customer',
    'Order Item Profit Ratio',
    'Customer Country',
    'rder Zipcode',
    'Product Status',
    'Customer State',
    'Customer Street',
    'Customer Zipcode',
    'Department Name',
    'Latitude',
    'Longitude',
    'Market',
    'Order City',
    'Order Country',
    'Order Item Discount',
    'Order Item Discount Rate',
    'Order Item Product Price',
    # 'Order Item Quantity',
    'Sales',
    'Order Status',
    'Product Card Id',
    'Product Price'
    'Product Status',
    'Shipping date (DateOrders)',
    'Shipping Mode',
    'Category Name',
    'Customer City',
    'Order Zipcode',
    'Order State',
    'Product Price',
    
]


df_columns_cleaned =  df_cleaned.drop(*columns_redondantes)

df_columns_cleaned.show()

In [6]:
df_columns_cleaned = df_columns_cleaned.withColumn(
    'order_date',
    f.to_timestamp('order date (DateOrders)', "M/d/yyyy H:mm")
)

df_columns_cleaned = df_columns_cleaned.withColumn('order_month', f.month('order_date'))

df_columns_cleaned = df_columns_cleaned.drop('order date (DateOrders)', 'order_date')

df_columns_cleaned.show()

In [68]:
rslt_minor = df_columns_cleaned.filter(f.col('Late_delivery_risk') == 0)
rslt_major = df_columns_cleaned.filter(f.col('Late_delivery_risk') == 1)

print(rslt_minor.count(), '>==<', rslt_major.count())

In [None]:
major_count = rslt_major.count()
minor_count = rslt_minor.count()

n_repeats = int(major_count / minor_count)

n_remainder = major_count % minor_count

replicated_df = rslt_minor.withColumn(
    "replicator",
    f.explode(f.array([f.lit(i) for i in range(n_repeats)]))
).drop("replicator")

replicated_df.count()

remainder_df = rslt_minor.orderBy(f.rand(seed=42)).limit(n_remainder)

balanced_df = rslt_major.unionAll(replicated_df).unionAll(remainder_df)

balanced_df.groupBy("Late_delivery_risk").count().show()

In [9]:
balanced_df.show()

In [10]:
balanced_df.dtypes

In [11]:
num_cols = []
cat_cols = []

for col, type in balanced_df.dtypes:
    if type in ['int', 'double'] :
        num_cols.append(col)
    else: 
        cat_cols.append(col)
        
        
cat_indexed = [
    'Customer Segment(indexed)',
    'Order Region(indexed)',
    'Type(indexed)'
    ]


cat_encoded = [    
    'Customer Segment(encoded)',
    'Order Region(encoded)',
    'Type(encoded)']

print(num_cols,cat_cols)


In [12]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoder

train_df, test_df = balanced_df.randomSplit([0.8, 0.2], seed=42)
num_cols.remove('Late_delivery_risk')
print(num_cols)
indexer = StringIndexer(inputCols=cat_cols, outputCols=cat_indexed)

encoder = OneHotEncoder(inputCols=cat_indexed, outputCols=cat_encoded)

assembler = VectorAssembler(inputCols=num_cols + cat_encoded, outputCol='features')

random_forest = RandomForestClassifier(labelCol='Late_delivery_risk', featuresCol='features')




In [13]:

df_with_index = balanced_df.rdd.zipWithIndex().toDF(["row", "index"])
df_test_selected = df_with_index.filter("index <= 3").select("row.*")
balanced_df = df_with_index.filter("index >= 3").select("row.*")

df_test_selected.show()

In [14]:


from pyspark.ml import Pipeline

pipline1 = Pipeline(stages=[indexer, encoder, assembler, random_forest])


model_rf = pipline1.fit(train_df)
preds_rf = model_rf.transform(test_df)




In [15]:
preds_rf.show()

In [16]:
log_reg = LogisticRegression(
    labelCol="Late_delivery_risk",
    featuresCol="features",
    maxIter=50
)

In [17]:
from pyspark.ml import Pipeline

pipline2 = Pipeline(stages=[indexer, encoder, assembler, log_reg])


model_lr = pipline2.fit(train_df)
preds_lr= model_lr.transform(test_df)

In [18]:
preds_lr.show()

In [67]:
preds_lr.dtypes

In [19]:
gbt = GBTClassifier(labelCol="Late_delivery_risk", featuresCol="features", maxIter=50)

pipline3 = Pipeline(stages=[indexer, encoder, assembler, gbt])

model_gbt = pipline3.fit(train_df)
preds_gbt = model_gbt.transform(test_df)

# Évaluation et Comparaison des Modèles

Nous allons maintenant évaluer les performances des trois modèles (Random Forest, Logistic Regression, Gradient Boosting Tree) en utilisant diverses métriques de performance.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


auc_eval = BinaryClassificationEvaluator(
    labelCol='Late_delivery_risk', rawPredictionCol= 'rawPrediction', metricName='areaUnderROC'
)

#auc eval

rf_auc = auc_eval.evaluate(preds_rf)
lr_auc = auc_eval.evaluate(preds_lr)
gbt_auc = auc_eval.evaluate(preds_gbt)



In [21]:

acc_eval = MulticlassClassificationEvaluator(
    labelCol='Late_delivery_risk', predictionCol='prediction', metricName='accuracy'
)

f1_eval = MulticlassClassificationEvaluator(
    labelCol='Late_delivery_risk', predictionCol='prediction', metricName='f1'
)

precision_eval = MulticlassClassificationEvaluator(
    labelCol='Late_delivery_risk', predictionCol='prediction', metricName='weightedPrecision'
    
)

recall_eval = MulticlassClassificationEvaluator(
    labelCol='Late_delivery_risk', predictionCol='prediction', metricName='weightedRecall'
    
)


# eval with accuracy
rf_acc = acc_eval.evaluate(preds_rf)
lr_acc = acc_eval.evaluate(preds_lr)
gbt_acc = acc_eval.evaluate(preds_gbt)

# eval with f1
rf_f1 = f1_eval.evaluate(preds_rf)
lr_f1 = f1_eval.evaluate(preds_lr)
gbt_f1 = f1_eval.evaluate(preds_gbt)

# precision eval: 
rf_precision = precision_eval.evaluate(preds_rf)
lr_precision = precision_eval.evaluate(preds_lr)
gbt_precision = precision_eval.evaluate(preds_gbt)

# recall eval: 
rf_recall = recall_eval.evaluate(preds_rf)
lr_recall = recall_eval.evaluate(preds_lr)
gbt_recall = recall_eval.evaluate(preds_gbt)



In [22]:
print("------------- LR")
print(f"AUC = {lr_auc:.3f}")
print(f"Accuracy = {lr_acc:.3f}")
print(f"F1-score = {lr_f1:.3f}")
print(f"Precision = {lr_precision:.3f}")
print(f"Recall = {lr_recall:.3f}")

print("\n------------- RF")
print(f"AUC = {rf_auc:.3f}")
print(f"Accuracy = {rf_acc:.3f}")
print(f"F1-score = {rf_f1:.3f}")
print(f"Precision = {rf_precision:.3f}")
print(f"Recall = {rf_recall:.3f}")

print("\n------------- GBT")
print(f"AUC = {gbt_auc:.3f}")
print(f"Accuracy = {gbt_acc:.3f}")
print(f"F1-score = {gbt_f1:.3f}")
print(f"Precision = {gbt_precision:.3f}")
print(f"Recall = {gbt_recall:.3f}")


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Random forest girdsearch :

paramGrid = (ParamGridBuilder()
            .addGrid(random_forest.numTrees, [50,100,200])
            .addGrid(random_forest.maxDepth, [5, 20])
            .addGrid(random_forest.maxBins, [16, 64])
            .build()
            )


cv = CrossValidator(
    estimator=pipline1,
    estimatorParamMaps=paramGrid,
    evaluator=auc_eval,
    numFolds=5,
    seed=42,
    # parallelism=1
)


In [24]:
cv_rf_model = cv.fit(train_df)


In [25]:
best_rf_model = cv_rf_model.bestModel

In [26]:
best_model_param = best_rf_model.stages[3]

In [27]:
print("Best numTrees:", best_model_param.getNumTrees)
print("Best maxDepth:", best_model_param.getMaxDepth())
print("Best maxBins:", best_model_param.getMaxBins())

In [28]:
cv_rf_preds = best_rf_model.transform(test_df)

In [29]:
rf_cv_auc = auc_eval.evaluate(cv_rf_preds)
# lr_auc = auc_eval.evaluate(preds_lr)
# gbt_auc = auc_eval.evaluate(preds_gbt)

# eval with accuracy
rf_cv_acc = acc_eval.evaluate(cv_rf_preds)
# lr_acc = acc_eval.evaluate(preds_lr)
# gbt_acc = acc_eval.evaluate(preds_gbt)

# eval with f1
rf_cv_f1 = f1_eval.evaluate(cv_rf_preds)
# lr_f1 = f1_eval.evaluate(preds_lr)
# gbt_f1 = f1_eval.evaluate(preds_gbt)

In [30]:
print("------------- rf")
print(f"AUC = {rf_cv_auc:.3f}")
print(f"Accuracy = {rf_cv_acc:.3f}")
print(f"F1-score = {rf_cv_f1:.3f}")

In [31]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# gbt girdsearch :

paramGrid = (ParamGridBuilder()
            .addGrid(gbt.maxDepth, [3, 5,10])
            .addGrid(gbt.maxBins, [32, 64])
            .addGrid(gbt.maxIter, [10, 30, 40])
            .addGrid(gbt.stepSize, [0.05, 0.1, 0.2])  
            .build()
            )


cv_gbt = CrossValidator(
    estimator=pipline3,
    estimatorParamMaps=paramGrid,
    evaluator=auc_eval,
    numFolds=5,
    seed=42,
    parallelism=1
)

In [32]:
cv_gbt_model = cv_gbt.fit(train_df)

In [33]:
best_gbt_model = cv_gbt_model.bestModel

best_gbt_params = best_gbt_model.stages[-1]


In [34]:
print("Best maxDepth:", best_gbt_params.getMaxDepth())
print("Best maxBins:", best_gbt_params.getMaxBins())


In [35]:
cv_gbt_preds = best_gbt_model.transform(test_df)

In [36]:
rf_cv_auc = auc_eval.evaluate(cv_rf_preds)
# lr_auc = auc_eval.evaluate(preds_lr)
gbt_cv_auc = auc_eval.evaluate(cv_gbt_preds)

# eval with accuracy
rf_cv_acc = acc_eval.evaluate(cv_rf_preds)
# lr_acc = acc_eval.evaluate(preds_lr)
gbt_cv_acc = acc_eval.evaluate(cv_gbt_preds)

# eval with f1
rf_cv_f1 = f1_eval.evaluate(cv_rf_preds)
# lr_f1 = f1_eval.evaluate(preds_lr)
gbt_cv_f1 = f1_eval.evaluate(cv_gbt_preds)

# precision eval: 
rf_cv_precision = precision_eval.evaluate(cv_rf_preds)
# lr_precision = precision_eval.evaluate(preds_lr)
gbt_cv_precision = precision_eval.evaluate(cv_gbt_preds)

# recall eval: 
rf_cv_recall = recall_eval.evaluate(cv_rf_preds)
# lr_recall = recall_eval.evaluate(preds_lr)
gbt_cv_recall = recall_eval.evaluate(cv_gbt_preds)

In [40]:
print("------------- RF")
print(f"AUC = {rf_cv_auc:.3f}")
print(f"Accuracy = {rf_cv_acc:.3f}")
print(f"F1-score = {rf_cv_f1:.3f}")
print(f"Precision = {rf_cv_precision:.3f}")
print(f"Recall = {rf_cv_recall:.3f}")

print("\n------------- GBT")
print(f"AUC = {gbt_cv_auc:.3f}")
print(f"Accuracy = {gbt_cv_acc:.3f}")
print(f"F1-score = {gbt_cv_f1:.3f}")
print(f"Precision = {gbt_cv_precision:.3f}")
print(f"Recall = {gbt_cv_recall:.3f}")


In [66]:
best_path = './models/rf_best_model'

try:
    best_rf_model.write().overwrite().save(best_path)
    print(f"\n✅ Best PipelineModel saved successfully at : {best_path}")
except Exception as e:
    print(f"\n❌ Error while saving the best PipelineModel: {e}")

In [65]:
gbt_path = './models/gbt_model'

try:
    cv_gbt_model.write().overwrite().save(gbt_path)
    print(f"\n✅ Modèle gbt sauvegardé avec succès à : {gbt_path}")
except Exception as e:
    print(f"\n❌ Erreur lors de la sauvegarde du modèle GBT : {e}")

    
