In [1]:
# Importação das bibliotecas
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import count

In [5]:
# Criando a SparkSession
spark = SparkSession.builder \
    .appName("SupplyChainAnalysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.4") \
    .master("local[*]") \
    .getOrCreate()

25/02/21 02:21:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# Carregando os dados
dados = pd.read_csv("dados/DataCoSupplyChainDataset.csv", encoding='latin1')

In [None]:
# Criando uma feature nova: Atraso no envio
dados['Shipping Delay'] = dados['Days for shipping (real)'] - dados['Days for shipment (scheduled)']

In [None]:
# Removendo colunas irrelevantes
colunas_para_remover = [
    'Product Status', 'Customer Password', 'Customer Email', 'Customer Street', 'Customer Fname', 
    'Customer Lname', 'Latitude', 'Longitude', 'Product Image', 'Product Description', 
    'Order Zipcode', 'shipping date (DateOrders)', 'Product Price', 'Category Id', 'Order Id', 
    'Product Category Id', 'Order Item Id', 'Product Card Id', 'Order Item Cardprod Id', 
    'Customer Id', 'Order Customer Id', 'Department Id', 'Customer Zipcode'
]
dados.drop(columns=colunas_para_remover, inplace=True)

In [None]:
# Tratamento de valores ausentes
dados.fillna(dados.median(numeric_only=True), inplace=True)

In [None]:
# Convertendo para Spark DataFrame
df_spark = spark.createDataFrame(dados)

In [None]:
# Balanceamento de Classes (Oversampling para a classe minoritária)
class_distribution = df_spark.groupBy("Late_delivery_risk").count().collect()
class_counts = {row["Late_delivery_risk"]: row["count"] for row in class_distribution}

In [None]:
# Obtendo a classe majoritária
max_count = max(class_counts.values())

In [None]:
# Criando DataFrames balanceados
df_balanced = df_spark
for classe, count in class_counts.items():
    if count < max_count:
        df_extra = df_spark.filter(col("Late_delivery_risk") == classe)
        df_extra = df_extra.sample(withReplacement=True, fraction=max_count / count)
        df_balanced = df_balanced.union(df_extra)

In [None]:
# Indexação de colunas categóricas
categorical_cols = ['Delivery Status', 'Type', 'Category Name', 'Customer Segment', 'Department Name', 'Order Region', 'Market', 'Late_delivery_risk']
indexers = [StringIndexer(inputCol=col, outputCol=col + "_idx", handleInvalid="keep") for col in categorical_cols]

In [None]:
# Vetorização das features
feature_cols = ['Type_idx', 'Days for shipping (real)', 'Order Region_idx', 'Customer Segment_idx', 'Department Name_idx', 'Shipping Delay']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

In [None]:
# Normalização
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

In [None]:
# Pipeline de pré-processamento
pipeline = Pipeline(stages=indexers + [assembler, scaler])
df_spark = pipeline.fit(df_balanced).transform(df_balanced)

In [None]:
# Separação em treino e teste
dados_treino, dados_teste = df_spark.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Avaliação
evaluator = MulticlassClassificationEvaluator(labelCol='Late_delivery_risk_idx', predictionCol='prediction', metricName='accuracy')

In [None]:
# Modelo Decision Tree
dt_model = DecisionTreeClassifier(labelCol='Late_delivery_risk_idx', featuresCol='scaled_features', impurity='gini')
dt_modelo_treinado = dt_model.fit(dados_treino)
dt_previsoes = dt_modelo_treinado.transform(dados_teste)
print(f"Acurácia Decision Tree: {evaluator.evaluate(dt_previsoes)}")

In [None]:
# Modelo Random Forest
rf_model = RandomForestClassifier(labelCol='Late_delivery_risk_idx', featuresCol='scaled_features')

In [None]:
paramGrid = (ParamGridBuilder()
             .addGrid(rf_model.numTrees, [50, 100, 200])
             .addGrid(rf_model.maxDepth, [5, 10, 15])
             .addGrid(rf_model.maxBins, [32, 64])
             .addGrid(rf_model.minInstancesPerNode, [1, 5, 10])
             .addGrid(rf_model.featureSubsetStrategy, ["auto", "sqrt", "log2"])
             .build())

In [None]:
crossval = CrossValidator(estimator=rf_model, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

In [None]:
cv_model = crossval.fit(dados_treino)

In [None]:
rf_previsoes = cv_model.transform(dados_teste)

In [None]:
print(f"Acurácia Random Forest: {evaluator.evaluate(rf_previsoes)}")

In [None]:
# Modelo GBT (Gradient Boosted Trees)
gbt = GBTClassifier(labelCol="Late_delivery_risk_idx", featuresCol="scaled_features", maxIter=100, maxDepth=10)

In [None]:
gbt_model = gbt.fit(dados_treino)

In [None]:
gbt_previsoes = gbt_model.transform(dados_teste)

In [None]:
print(f"Acurácia GBTClassifier: {evaluator.evaluate(gbt_previsoes)}")