In [1]:
from pyspark.ml.linalg import Vectors

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



In [3]:
#Libraries and class from StaticModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# import pickle
import time

#Script Class
from PreProcess import *
from PostProcess import *

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [5]:
from pyspark.ml.classification import RandomForestClassifier

In [6]:
spark = SparkSession.builder \
        .appName("Churn EDA PySpark") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

In [8]:


schema = StructType([
    StructField("CustomerId", IntegerType(), True),
    StructField("Churn", IntegerType(), True),
    StructField("Tenure", FloatType(), True),
    StructField("PreferredLoginDevice", StringType(), True),
    StructField("CityTier", IntegerType(), True),
    StructField("WarehouseToHome", FloatType(), True),
    StructField("PreferredPaymentMode", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("HourSpendOnApp", FloatType(), True),
    StructField("NumberOfDeviceRegistered", IntegerType(), True),
    StructField("PreferedOrderCat", StringType(), True),
    StructField("SatisfactionScore", IntegerType(), True),
    StructField("MaritalStatus", StringType(), True),
    StructField("NumberOfAddress", IntegerType(), True),
    StructField("Complain", IntegerType(), True),
    StructField("OrderAmountHikeFromlastYear", FloatType(), True),
    StructField("CouponUsed", FloatType(), True),
    StructField("OrderCount", FloatType(), True),
    StructField("DaySinceLastOrder", FloatType(), True),
    StructField("CashbackAmount", FloatType(), True)
])

In [9]:
# Load the data stored in e-commerce folder format as a DataFrame.
df = spark.read.format("csv") \
    .option("header", True) \
    .schema(schema) \
    .load("/home/jovyan/code/churn/e-commerce/e-commerce-dataset.csv")


In [10]:
pre_process_instance = PreProcess(df)


In [11]:
pre_process_instance.columns_types(df)

{'numeric_cols': ['CustomerId',
  'Churn',
  'Tenure',
  'CityTier',
  'WarehouseToHome',
  'HourSpendOnApp',
  'NumberOfDeviceRegistered',
  'SatisfactionScore',
  'NumberOfAddress',
  'Complain',
  'OrderAmountHikeFromlastYear',
  'CouponUsed',
  'OrderCount',
  'DaySinceLastOrder',
  'CashbackAmount'],
 'categorical_cols': ['PreferredLoginDevice',
  'PreferredPaymentMode',
  'Gender',
  'PreferedOrderCat',
  'MaritalStatus'],
 'boolean_cols': [],
 'array_cols': [],
 'struct_cols': [],
 'unknown_cols': []}

In [12]:
columns_to_string_index = [("PreferredLoginDevice","PreferredLoginDeviceIndex"),
                           ("PreferredPaymentMode","PreferredPaymentModeIndex"),
                           ("Gender","GenderIndex"),
                           ("PreferedOrderCat","PreferedOrderCatIndex"),
("MaritalStatus","MaritalStatusIndex")]

numeric_cols =  ['CustomerId',
  'Tenure',
  'CityTier',
  'WarehouseToHome',
  'HourSpendOnApp',
  'NumberOfDeviceRegistered',
  'SatisfactionScore',
  'NumberOfAddress',
  'Complain',
  'OrderAmountHikeFromlastYear',
  'CouponUsed',
  'OrderCount',
  'DaySinceLastOrder',
  'CashbackAmount']

inpult_col = ['Churn']

In [None]:
df_transformed.select(F.countDistinct("CustomerId")).show()
categorical_cols = ['PreferredLoginDevice',
  'PreferredPaymentMode',
  'Gender',
  'PreferedOrderCat',
  'MaritalStatus']
for c in categorical_cols:
    df.select(F.countDistinct(c)).show()

In [13]:
indexed_df = pre_process_instance.string_index_columns( df, columns_to_string_index)

In [14]:
columns_to_encode = [("PreferredLoginDeviceIndex","PreferredLoginDeviceIndexEncoded"),
                           ("PreferredPaymentModeIndex","PreferredPaymentModeIndexEncoded"),
                           ("GenderIndex","GenderIndexEncoded"),
                           ("PreferedOrderCatIndex","PreferedOrderCatIndexEncoded"),
("MaritalStatusIndex","MaritalStatusIndexEncoded")]

In [15]:
encoded_df = pre_process_instance.encoded_index_columns( indexed_df, columns_to_encode)

In [None]:
encoded_df.select(["PreferredLoginDeviceIndexEncoded"
                ,"PreferredPaymentModeIndexEncoded"
                ,"GenderIndexEncoded"
                ,"PreferedOrderCatIndexEncoded"
                ,"MaritalStatusIndexEncoded"]).show()

In [16]:
encoded_df = pre_process_instance.encoded_index_columns( indexed_df, columns_to_encode)
# Assuming you have an existing DataFrame 'df' with feature columns
# Create a list of feature column names
feature_cols = [ "PreferredLoginDeviceIndexEncoded"
                ,"PreferredPaymentModeIndexEncoded"
                ,"GenderIndexEncoded"
                ,"PreferedOrderCatIndexEncoded"
                ,"MaritalStatusIndexEncoded"
                ,"CustomerId",
                "Tenure",
                "CityTier",
                "WarehouseToHome",
                "HourSpendOnApp",
                "NumberOfDeviceRegistered",
                "SatisfactionScore",
                "NumberOfAddress",
                "Complain",
                "OrderAmountHikeFromlastYear",
                "CouponUsed",
                "OrderCount",
                "DaySinceLastOrder",
                "CashbackAmount"]


# Create a VectorAssembler object
#holavector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")

assembled_df = pre_process_instance.vector_feature_column(encoded_df, feature_cols) #holavector_assembler.transform(encoded_df)
assembled_df.select("features").show()

+--------------------+
|            features|
+--------------------+
|(30,[0,2,9,15,16,...|
|(30,[2,8,9,15,16,...|
|(30,[1,2,10,15,16...|
|(30,[0,2,8,11,15,...|
|(30,[6,8,12,15,16...|
|(30,[7,8,12,16,18...|
|(30,[0,3,8,11,15,...|
|(30,[0,4,8,9,15,1...|
|(30,[1,2,8,9,15,1...|
|(30,[0,2,10,16,17...|
|(30,[0,2,8,9,16,1...|
|(30,[0,3,11,16,17...|
|(30,[2,9,16,18,19...|
|(30,[1,2,8,11,15,...|
|(30,[3,10,15,16,1...|
|(30,[0,4,8,11,16,...|
|(30,[1,4,11,15,16...|
|(30,[1,6,8,9,16,1...|
|(30,[0,3,8,9,16,1...|
|(30,[7,12,16,17,1...|
+--------------------+
only showing top 20 rows



In [17]:
from pyspark.ml.tuning import ParamGridBuilder

In [19]:
randomforest = RandomForestClassifier(labelCol="Churn", featuresCol="features",seed = 42)

In [20]:

# Create a ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(randomforest.numTrees, [10, 20, 30])  # Number of trees
             .addGrid(randomforest.maxDepth, [5, 10, 15])   # Maximum depth of the tree
             .addGrid(randomforest.maxBins, [32, 64, 128])  # Number of bins used when discretizing continuous features
             .addGrid(randomforest.featureSubsetStrategy, ['auto', 'sqrt', 'log2'])  # Strategy to select a subset of features
             .build())

# paramGrid is now a list of ParamMap, each ParamMap representing a different combination of hyperparameters
paramGrid

[{Param(parent='RandomForestClassifier_48a45e3dfd95', name='numTrees', doc='Number of trees to train (>= 1).'): 10,
  Param(parent='RandomForestClassifier_48a45e3dfd95', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='RandomForestClassifier_48a45e3dfd95', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32,
  Param(parent='RandomForestClassifier_48a45e3dfd95', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use l

In [21]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define an evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Churn", predictionCol="prediction", metricName="accuracy")

# Create the CrossValidator
crossval = CrossValidator(estimator=randomforest,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold 

In [22]:
# Assuming `trainingData` is your DataFrame with labeled data
cvModel = crossval.fit(assembled_df)


In [23]:
bestModel = cvModel.bestModel

In [None]:
# Print the best model's parameters
print("Best numTrees:", bestModel.getNumTrees)
print("Best maxDepth:", bestModel.getMaxDepth)
print("Best maxBins:", bestModel.getMaxBins)
print("Best featureSubsetStrategy:", bestModel.getFeatureSubsetStrategy)

In [24]:
ruta_modelo = "/home/jovyan/code/churn/e-commerce/modelo"
bestModel.write().overwrite().save(ruta_modelo)
#randomforest.save("/home/jovyan/code/churn/e-commerce/modelo")

#rf2.getNumTrees()

In [27]:
from pyspark.ml.classification import RandomForestClassificationModel

In [28]:
rf2 = RandomForestClassificationModel.load(ruta_modelo)

In [29]:
# Step 6: Make predictions
predictions = rf2.transform(assembled_df)

# Show predictions
predictions.select("Churn", "features", "prediction").show()


+-----+--------------------+----------+
|Churn|            features|prediction|
+-----+--------------------+----------+
|    1|(30,[0,2,9,15,16,...|       1.0|
|    1|(30,[2,8,9,15,16,...|       1.0|
|    1|(30,[1,2,10,15,16...|       1.0|
|    1|(30,[0,2,8,11,15,...|       1.0|
|    1|(30,[6,8,12,15,16...|       1.0|
|    1|(30,[7,8,12,16,18...|       1.0|
|    1|(30,[0,3,8,11,15,...|       1.0|
|    1|(30,[0,4,8,9,15,1...|       1.0|
|    1|(30,[1,2,8,9,15,1...|       1.0|
|    1|(30,[0,2,10,16,17...|       1.0|
|    1|(30,[0,2,8,9,16,1...|       1.0|
|    1|(30,[0,3,11,16,17...|       1.0|
|    1|(30,[2,9,16,18,19...|       1.0|
|    1|(30,[1,2,8,11,15,...|       1.0|
|    1|(30,[3,10,15,16,1...|       1.0|
|    0|(30,[0,4,8,11,16,...|       0.0|
|    0|(30,[1,4,11,15,16...|       0.0|
|    0|(30,[1,6,8,9,16,1...|       0.0|
|    0|(30,[0,3,8,9,16,1...|       0.0|
|    0|(30,[7,12,16,17,1...|       0.0|
+-----+--------------------+----------+
only showing top 20 rows



In [32]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="Churn", predictionCol="prediction", metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

# Show detailed evaluation metrics
evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictions)
print(f"Weighted Precision = {precision}")

evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictions)
print(f"Weighted Recall = {recall}")

evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictions)
print(f"F1 Score = {f1}")

Test Accuracy = 1.0
Weighted Precision = 1.0
Weighted Recall = 1.0
F1 Score = 1.0


In [33]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [34]:
predictions_and_labels = predictions.select(col("prediction").cast("float"), col("Churn").cast("float"))

metrics = MulticlassMetrics(predictions_and_labels.rdd.map(tuple))

# Print confusion matrix
print("Confusion Matrix:")
print(metrics.confusionMatrix().toArray())



Confusion Matrix:
[[3143.    0.]
 [   0.  631.]]


In [35]:
rmse = evaluator.evaluate(predictions)
print("Test RMSE = %g" % rmse)

Test RMSE = 1


In [None]:
from pyspark.sql.functions import col,isnan,when,count
df2 = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns])
df2.show()

In [None]:
bestModel.getFeaturesCol()

**Otras pruebas**