<a href="https://colab.research.google.com/github/oumaima33/BCP_Churn_prediction/blob/main/BCP_Churn_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Bank Customer Churn


__Churner__ is generally defined as a customer who stops using a product or service for a given period of time.



This notebook is to do the data analysis and predictions on the `Master table.csv` file.

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql.functions import col, sum, when

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Datacamp Pyspark Tutorial") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "10g") \
    .config("spark.driver.memory","32G")\
    .config("spark.executer.memory","32G")\
    .getOrCreate()

In [4]:
master=spark.read.csv("/content/Master table.csv",header=True, inferSchema=True)

In [5]:
master.show()

+--------+---------+--------+--------------+---+---+--------------+---------+--------+-------+-------+--------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|Idclient|ANNEEMOIS|   genre|MARITAL_STATUS|BPR|age|CUSTOMER_YEARS|AGE_GROUP|   SOLDE| MVTDEB|NBMVTDB| MVTCRED|NBMVTCRE|MONTANTFACTURE|MONTANTVIREMENT|MONTANTOPERATIONS|NBROPERATION|MONTANTGAB|NBROPERATIONGAB|HasCreditYousr|ALREADY CONNECTED|ACTIVE_RATIO|CONNECTED RECENTLY|Churn_next_trim|
+--------+---------+--------+--------------+---+---+--------------+---------+--------+-------+-------+--------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|       0|   201904| Féminin|      Marié(e)|  1| 64|            23|    55-64| 1025.33|3452.73|   9.33|  2500.0|     1.0|       

In [6]:
master.count()

3392620

In [7]:
master.printSchema()

root
 |-- Idclient: integer (nullable = true)
 |-- ANNEEMOIS: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- BPR: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- CUSTOMER_YEARS: integer (nullable = true)
 |-- AGE_GROUP: string (nullable = true)
 |-- SOLDE: double (nullable = true)
 |-- MVTDEB: double (nullable = true)
 |-- NBMVTDB: double (nullable = true)
 |-- MVTCRED: double (nullable = true)
 |-- NBMVTCRE: double (nullable = true)
 |-- MONTANTFACTURE: double (nullable = true)
 |-- MONTANTVIREMENT: double (nullable = true)
 |-- MONTANTOPERATIONS: double (nullable = true)
 |-- NBROPERATION: double (nullable = true)
 |-- MONTANTGAB: double (nullable = true)
 |-- NBROPERATIONGAB: double (nullable = true)
 |-- HasCreditYousr: integer (nullable = true)
 |-- ALREADY CONNECTED: integer (nullable = true)
 |-- ACTIVE_RATIO: double (nullable = true)
 |-- CONNECTED RECENTLY: integer (nullable = true)
 |-- Chur

In [8]:
null_counts = master.select(*(sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in master.columns))

null_counts.show()

+--------+---------+-----+--------------+---+---+--------------+---------+-----+------+-------+-------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|Idclient|ANNEEMOIS|genre|MARITAL_STATUS|BPR|age|CUSTOMER_YEARS|AGE_GROUP|SOLDE|MVTDEB|NBMVTDB|MVTCRED|NBMVTCRE|MONTANTFACTURE|MONTANTVIREMENT|MONTANTOPERATIONS|NBROPERATION|MONTANTGAB|NBROPERATIONGAB|HasCreditYousr|ALREADY CONNECTED|ACTIVE_RATIO|CONNECTED RECENTLY|Churn_next_trim|
+--------+---------+-----+--------------+---+---+--------------+---------+-----+------+-------+-------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|       0|        0|    0|             0|  0|  0|             0|        0|    0|     0|      0|      0|       0|             0|              0|        

In [9]:
from pyspark.sql.functions import col, max

# Calculer la valeur maximale de la colonne ACTIVE_RATIO
max_value = master.agg(max(col("ACTIVE_RATIO"))).collect()[0][0]

# Remplacer les valeurs nulles par la valeur maximale
master = master.na.fill({"ACTIVE_RATIO": max_value})

In [10]:
from pyspark.sql.functions import to_date

# Convertir la colonne 'ANNEEMOIS' en format de date avec le format 'yyyyMM'
master = master.withColumn("ANNEEMOIS", to_date(col("ANNEEMOIS"), "yyyyMM"))


In [11]:
from pyspark.sql.functions import col

# Convertir les colonnes 'Idclient' et 'BPR' en chaînes de caractères
master = master.withColumn("Idclient", col("Idclient").cast("string")) \
               .withColumn("BPR", col("BPR").cast("string"))


In [12]:
master.printSchema()

root
 |-- Idclient: string (nullable = true)
 |-- ANNEEMOIS: date (nullable = true)
 |-- genre: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- BPR: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- CUSTOMER_YEARS: integer (nullable = true)
 |-- AGE_GROUP: string (nullable = true)
 |-- SOLDE: double (nullable = true)
 |-- MVTDEB: double (nullable = true)
 |-- NBMVTDB: double (nullable = true)
 |-- MVTCRED: double (nullable = true)
 |-- NBMVTCRE: double (nullable = true)
 |-- MONTANTFACTURE: double (nullable = true)
 |-- MONTANTVIREMENT: double (nullable = true)
 |-- MONTANTOPERATIONS: double (nullable = true)
 |-- NBROPERATION: double (nullable = true)
 |-- MONTANTGAB: double (nullable = true)
 |-- NBROPERATIONGAB: double (nullable = true)
 |-- HasCreditYousr: integer (nullable = true)
 |-- ALREADY CONNECTED: integer (nullable = true)
 |-- ACTIVE_RATIO: double (nullable = false)
 |-- CONNECTED RECENTLY: integer (nullable = true)
 |-- Churn_ne

In [13]:
master = master.filter(col("CUSTOMER_YEARS") >= 0)

In [14]:
master.show()

+--------+----------+--------+--------------+---+---+--------------+---------+--------+-------+-------+--------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|Idclient| ANNEEMOIS|   genre|MARITAL_STATUS|BPR|age|CUSTOMER_YEARS|AGE_GROUP|   SOLDE| MVTDEB|NBMVTDB| MVTCRED|NBMVTCRE|MONTANTFACTURE|MONTANTVIREMENT|MONTANTOPERATIONS|NBROPERATION|MONTANTGAB|NBROPERATIONGAB|HasCreditYousr|ALREADY CONNECTED|ACTIVE_RATIO|CONNECTED RECENTLY|Churn_next_trim|
+--------+----------+--------+--------------+---+---+--------------+---------+--------+-------+-------+--------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|       0|2019-04-01| Féminin|      Marié(e)|  1| 64|            23|    55-64| 1025.33|3452.73|   9.33|  2500.0|     1.0|   

In [15]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when


categorical_columns = ["genre", "MARITAL_STATUS", "BPR", "AGE_GROUP", "HasCreditYousr", "ALREADY CONNECTED", "CONNECTED RECENTLY"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in categorical_columns]



encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_ohe") for column in categorical_columns]

num_features = ['age', 'CUSTOMER_YEARS', 'SOLDE', 'MVTDEB', 'NBMVTDB', 'MVTCRED', 'NBMVTCRE', 'MONTANTFACTURE',
                'MONTANTVIREMENT', 'MONTANTOPERATIONS', 'NBROPERATION', 'MONTANTGAB', 'NBROPERATIONGAB', 'ACTIVE_RATIO']


encoded_features = [column+"_ohe" for column in categorical_columns]

assembler = VectorAssembler(inputCols=num_features + encoded_features, outputCol='features')


scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)


pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])


train, test = master.randomSplit([0.7, 0.3], seed=123)


In [16]:
train, test = master.randomSplit([0.7, 0.3], seed=123)

pipeline_model = pipeline.fit(train)

train_transformed = pipeline_model.transform(train).withColumn("label", col("Churn_next_trim")).select("scaledFeatures", "label")
test_transformed = pipeline_model.transform(test).withColumn("label", col("Churn_next_trim")).select("scaledFeatures", "label")

train_transformed.show(5)


+--------------------+-----+
|      scaledFeatures|label|
+--------------------+-----+
|(41,[0,1,2,3,4,5,...|    0|
|(41,[0,1,2,3,9,10...|    0|
|(41,[0,1,2,3,4,13...|    0|
|(41,[0,1,3,4,9,10...|    0|
|(41,[0,1,2,13,15,...|    0|
+--------------------+-----+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier, MultilayerPerceptronClassifier

# Initialize evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Function to evaluate model metrics
def evaluate_model(predictions):
    # Calculate AUC
    auc = binary_evaluator.evaluate(predictions)

    # Calculate Accuracy
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})

    # Calculate Recall
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

    # Calculate F1-Score
    f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    # Calculate Precision
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})

    # Return metrics as a dictionary
    return {
        "AUC": auc,
        "Accuracy": accuracy,
        "Recall": recall,
        "F1-Score": f1_score,
        "Precision": precision
    }

# Define the models to train and evaluate
models = {
    "Logistic Regression": LogisticRegression(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Random Forest": RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Decision Tree": DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Gradient Boosting": GBTClassifier(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Multilayer Perceptron": MultilayerPerceptronClassifier(labelCol="label", featuresCol="scaledFeatures", layers=[train_transformed.select("scaledFeatures").first()["scaledFeatures"].size, 5, 4, 2], maxIter=100)
}

# Dictionary to store results
metrics_results = []

# Fit models and evaluate
for model_name, model in models.items():
    print(f"Training and evaluating {model_name}...")
    model_trained = model.fit(train_transformed)
    predictions = model_trained.transform(test_transformed)

    # Evaluate the model
    metrics = evaluate_model(predictions)

    # Append results for this model
    metrics_results.append((model_name, metrics["AUC"], metrics["Accuracy"], metrics["Precision"], metrics["Recall"], metrics["F1-Score"]))

# Convert the metrics results to DataFrame
metrics_df = spark.createDataFrame(metrics_results, ["Model", "AUC", "Accuracy", "Precision", "Recall", "F1-Score"])
# Show metrics
metrics_df.show(truncate=False)

Training and evaluating Logistic Regression...
Training and evaluating Random Forest...
Training and evaluating Decision Tree...
Training and evaluating Gradient Boosting...
Training and evaluating Multilayer Perceptron...
+---------------------+------------------+-----------------+------------------+-----------------+------------------+
|Model                |AUC               |Accuracy         |Precision         |Recall           |F1-Score          |
+---------------------+------------------+-----------------+------------------+-----------------+------------------+
|Logistic Regression  |0.6160642674446796|0.998948632337711|0.9978983700493834|0.998948632337711|0.9984232249954026|
|Random Forest        |0.5793408921002605|0.998948632337711|0.9978983700493834|0.998948632337711|0.9984232249954026|
|Decision Tree        |0.5               |0.998948632337711|0.9978983700493834|0.998948632337711|0.9984232249954026|
|Gradient Boosting    |0.6624898847935579|0.998948632337711|0.9978983700493

## Balancing using SMOTE

In [None]:
class SmoteConfig:
    def __init__(self, k, bucketLength, seed, multiplier):
        self.k = k
        self.bucketLength = bucketLength
        self.seed = seed
        self.multiplier = multiplier

# Initialize the SMOTE configuration
smote_config = SmoteConfig(k=5, bucketLength=2.0, seed=42, multiplier=500)


In [None]:
!pip install imbalanced-learn


Collecting imbalanced-learn
  Downloading imbalanced_learn-0.12.4-py3-none-any.whl.metadata (8.3 kB)
Downloading imbalanced_learn-0.12.4-py3-none-any.whl (258 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m258.3/258.3 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: imbalanced-learn
Successfully installed imbalanced-learn-0.12.4


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from imblearn.over_sampling import SMOTE
import numpy as np
import pandas as pd

# Assuming your data is in a Spark DataFrame with 'scaledFeatures' and 'label' columns
train_pd = train_transformed.toPandas()  # Convert Spark DataFrame to Pandas DataFrame for SMOTE

# Separate features and labels
X_train = np.array(train_pd['scaledFeatures'].tolist())  # Convert the scaled features to an array
y_train = train_pd['label'].values

# Apply SMOTE using the configuration
smote = SMOTE(sampling_strategy='auto', k_neighbors=smote_config.k, random_state=smote_config.seed)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

# Convert the resampled data back to a Spark DataFrame
train_resampled = pd.DataFrame(X_resampled, columns=[f"feature_{i}" for i in range(X_resampled.shape[1])])
train_resampled['label'] = y_resampled

train_resampled_spark = spark.createDataFrame(train_resampled)

# Now you have the resampled training set, you can use it for further modeling
train_resampled_spark.show(5)

+-----------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+---------+---------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+----------+----------+----------+----------+----------+----------+-----------------+------------------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+----------+-----------------+------------------+----------+-----------------+-----------------+------------------+-----+
|        feature_0|         feature_1|           feature_2|           feature_3|           feature_4|           feature_5|         feature_6|feature_7|feature_8|          feature_9|         feature_10|         feature_11|        feature_12|        feature_13|        feature_14|       feature_15|        feature_16|feature_17|feature_18|feature_19|featur

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import Vectors, VectorUDT


# Assuming 'scaledFeatures' is a vector in Spark
# Convert resampled pandas data back to a Spark DataFrame with correct vector type for 'scaledFeatures'
train_resampled_spark = train_resampled_spark.withColumn(
    'scaledFeatures', F.udf(lambda x: Vectors.dense(x), VectorUDT())(F.array([F.col(f"feature_{i}") for i in range(X_resampled.shape[1])]))
)

# Extract the input size for the MLP layers
input_size = len(train_resampled_spark.select("scaledFeatures").first()["scaledFeatures"])

mlp_layers = [input_size, 5, 4, 2]  # Define the MLP architecture

# Define evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Function to evaluate model metrics
def evaluate_model(predictions):
    auc = binary_evaluator.evaluate(predictions)
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    return {
        "AUC": auc,
        "Accuracy": accuracy,
        "Recall": recall,
        "F1-Score": f1_score,
        "Precision": precision
    }

# Define the models
models = {
    "Logistic Regression": LogisticRegression(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Random Forest": RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Decision Tree": DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Gradient Boosting": GBTClassifier(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Multilayer Perceptron": MultilayerPerceptronClassifier(labelCol="label", featuresCol="scaledFeatures", layers=mlp_layers, maxIter=100)
}

# Store results
metrics_results = []

# Train models and evaluate
for model_name, model in models.items():
    print(f"Training and evaluating {model_name}...")
    model_trained = model.fit(train_resampled_spark)  # Train on resampled data
    predictions = model_trained.transform(test_transformed)  # Ensure test_transformed is prepared similarly to training data

    metrics = evaluate_model(predictions)
    metrics_results.append((model_name, metrics["AUC"], metrics["Accuracy"], metrics["Precision"], metrics["Recall"], metrics["F1-Score"]))

# Convert to DataFrame and display
metrics_df = spark.createDataFrame(metrics_results, ["Model", "AUC", "Accuracy", "Precision", "Recall", "F1-Score"])
metrics_df.show(truncate=False)


Training and evaluating Logistic Regression...
Training and evaluating Random Forest...
Training and evaluating Decision Tree...
Training and evaluating Gradient Boosting...
Training and evaluating Multilayer Perceptron...
+---------------------+------------------+------------------+------------------+------------------+------------------+
|Model                |AUC               |Accuracy          |Precision         |Recall            |F1-Score          |
+---------------------+------------------+------------------+------------------+------------------+------------------+
|Logistic Regression  |0.6226531682277188|0.5581032934337667|0.9982182216659893|0.5581032934337669|0.7154040278332193|
|Random Forest        |0.5813250325670396|0.6876376849473629|0.9980781817457444|0.6876376849473629|0.8139588875856224|
|Decision Tree        |0.5543239554379721|0.6370511790056617|0.9980380139049291|0.6370511790056618|0.777348383462843 |
|Gradient Boosting    |0.610744975141238 |0.9819056677560277|0.

In [None]:

class_count = master.groupBy("Churn_next_trim").count().collect()

majority_count = -1
minority_count = float('inf')

for row in class_count:
    count_value = row['count']
    if count_value > majority_count:
        majority_count = count_value
    if count_value < minority_count:
        minority_count = count_value


print("Majority count:", majority_count)
print("Minority count:", minority_count)
train_weighted = train_transformed.withColumn("classWeightCol", when(train_transformed["label"] == 1, majority_count/minority_count).otherwise(1.0))


Majority count: 3388530
Minority count: 3654


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier, MultilayerPerceptronClassifier

# Initialize evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Function to evaluate model metrics
def evaluate_model(predictions):
    # Calculate AUC
    auc = binary_evaluator.evaluate(predictions)

    # Calculate Accuracy
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})

    # Calculate Recall
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

    # Calculate F1-Score
    f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    # Calculate Precision
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})

    # Return metrics as a dictionary
    return {
        "AUC": auc,
        "Accuracy": accuracy,
        "Recall": recall,
        "F1-Score": f1_score,
        "Precision": precision
    }

# Define the models to train and evaluate
models = {
    "Logistic Regression": LogisticRegression(labelCol="label", featuresCol="scaledFeatures", weightCol="classWeightCol", maxIter=100),
    "Random Forest": RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures", weightCol="classWeightCol"),
    "Decision Tree": DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Gradient Boosting": GBTClassifier(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Multilayer Perceptron": MultilayerPerceptronClassifier(labelCol="label", featuresCol="scaledFeatures", layers=[train_weighted.select("scaledFeatures").first()["scaledFeatures"].size, 5, 4, 2], maxIter=100)
}

# Dictionary to store results
metrics_results = []

# Fit models and evaluate
for model_name, model in models.items():
    print(f"Training and evaluating {model_name}...")
    model_trained = model.fit(train_weighted)
    predictions = model_trained.transform(test_transformed)

    # Evaluate the model
    metrics = evaluate_model(predictions)

    # Append results for this model
    metrics_results.append((model_name, metrics["AUC"], metrics["Accuracy"], metrics["Precision"], metrics["Recall"], metrics["F1-Score"]))

# Convert the metrics results to DataFrame
metrics_df = spark.createDataFrame(metrics_results, ["Model", "AUC", "Accuracy", "Precision", "Recall", "F1-Score"])
# Show metrics
metrics_df.show(truncate=False)

Training and evaluating Logistic Regression...
Training and evaluating Random Forest...
Training and evaluating Decision Tree...
Training and evaluating Gradient Boosting...
Training and evaluating Multilayer Perceptron...
+---------------------+------------------+------------------+------------------+------------------+------------------+
|Model                |AUC               |Accuracy          |Precision         |Recall            |F1-Score          |
+---------------------+------------------+------------------+------------------+------------------+------------------+
|Logistic Regression  |0.622462166085117 |0.563697158949104 |0.9982167986648781|0.563697158949104 |0.719997487898472 |
|Random Forest        |0.6512157625978561|0.5869294365258882|0.9983011440664163|0.5869294365258881|0.7387104280879194|
|Decision Tree        |0.5               |0.998948632337711 |0.9978983700493834|0.998948632337711 |0.9984232249954026|
|Gradient Boosting    |0.6624879120364328|0.998948632337711 |0.

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier, MultilayerPerceptronClassifier
from imblearn.combine import SMOTETomek
import numpy as np
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame for SMOTE-Tomek Link
train_pd = train_transformed.toPandas()  # Convert Spark DataFrame to Pandas DataFrame

# Separate features and labels
X_train = np.array(train_pd['scaledFeatures'].tolist())  # Convert the 'scaledFeatures' vector to an array
y_train = train_pd['label'].values  # Extract labels

# Apply SMOTE-Tomek Link
smote_tomek = SMOTETomek(sampling_strategy='auto', random_state=42)  # Use desired random_state
X_resampled, y_resampled = smote_tomek.fit_resample(X_train, y_train)

# Convert resampled data back to Pandas DataFrame
train_resampled = pd.DataFrame(X_resampled, columns=[f"feature_{i}" for i in range(X_resampled.shape[1])])
train_resampled['label'] = y_resampled

# Convert back to Spark DataFrame and create vector column for 'scaledFeatures'
train_resampled_spark = spark.createDataFrame(train_resampled)

# Recreate 'scaledFeatures' as a vector column in Spark DataFrame
train_resampled_spark = train_resampled_spark.withColumn(
    'scaledFeatures', F.udf(lambda x: Vectors.dense(x), VectorUDT())(F.array([F.col(f"feature_{i}") for i in range(X_resampled.shape[1])]))
)

# Extract the input size for MLP layers
input_size = len(train_resampled_spark.select("scaledFeatures").first()["scaledFeatures"])

# Define the MLP architecture
mlp_layers = [input_size, 5, 4, 2]  # Example MLP architecture

# Define evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Function to evaluate model metrics
def evaluate_model(predictions):
    auc = binary_evaluator.evaluate(predictions)
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    return {
        "AUC": auc,
        "Accuracy": accuracy,
        "Recall": recall,
        "F1-Score": f1_score,
        "Precision": precision
    }

# Define the models
models = {
    "Logistic Regression": LogisticRegression(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Random Forest": RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Decision Tree": DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Gradient Boosting": GBTClassifier(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Multilayer Perceptron": MultilayerPerceptronClassifier(labelCol="label", featuresCol="scaledFeatures", layers=mlp_layers, maxIter=100)
}

# Store results
metrics_results = []

# Train models and evaluate
for model_name, model in models.items():
    print(f"Training and evaluating {model_name}...")
    model_trained = model.fit(train_resampled_spark)  # Train on resampled data
    predictions = model_trained.transform(test_transformed)  # Ensure test_transformed is prepared similarly to training data

    metrics = evaluate_model(predictions)
    metrics_results.append((model_name, metrics["AUC"], metrics["Accuracy"], metrics["Precision"], metrics["Recall"], metrics["F1-Score"]))

# Convert to DataFrame and display
metrics_df = spark.createDataFrame(metrics_results, ["Model", "AUC", "Accuracy", "Precision", "Recall", "F1-Score"])
metrics_df.show(truncate=False)


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier, MultilayerPerceptronClassifier
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
import numpy as np
import pandas as pd

# Step 1: Convert Spark DataFrame to Pandas DataFrame for SMOTE + Random Undersampling
train_pd = train_transformed.toPandas()  # Convert Spark DataFrame to Pandas

# Separate features and labels
X_train = np.array(train_pd['scaledFeatures'].tolist())  # Convert 'scaledFeatures' vector to an array
y_train = train_pd['label'].values  # Extract labels

# Step 2: Apply SMOTE for oversampling the minority class
smote = SMOTE(sampling_strategy='auto', random_state=42)
X_smote_resampled, y_smote_resampled = smote.fit_resample(X_train, y_train)

# Step 3: Apply Random Undersampling to the majority class
undersampler = RandomUnderSampler(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = undersampler.fit_resample(X_smote_resampled, y_smote_resampled)

# Step 4: Convert resampled data back to a Pandas DataFrame
train_resampled = pd.DataFrame(X_resampled, columns=[f"feature_{i}" for i in range(X_resampled.shape[1])])
train_resampled['label'] = y_resampled

# Step 5: Convert back to Spark DataFrame and create vector column for 'scaledFeatures'
train_resampled_spark = spark.createDataFrame(train_resampled)

# Recreate 'scaledFeatures' as a vector column in Spark DataFrame
train_resampled_spark = train_resampled_spark.withColumn(
    'scaledFeatures', F.udf(lambda x: Vectors.dense(x), VectorUDT())(F.array([F.col(f"feature_{i}") for i in range(X_resampled.shape[1])]))
)

# Extract the input size for MLP layers
input_size = len(train_resampled_spark.select("scaledFeatures").first()["scaledFeatures"])

# Define the MLP architecture
mlp_layers = [input_size, 5, 4, 2]  # Example MLP architecture

# Define evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Function to evaluate model metrics
def evaluate_model(predictions):
    auc = binary_evaluator.evaluate(predictions)
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    return {
        "AUC": auc,
        "Accuracy": accuracy,
        "Recall": recall,
        "F1-Score": f1_score,
        "Precision": precision
    }

# Define the models
models = {
    "Logistic Regression": LogisticRegression(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Random Forest": RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Decision Tree": DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures"),
    "Gradient Boosting": GBTClassifier(labelCol="label", featuresCol="scaledFeatures", maxIter=100),
    "Multilayer Perceptron": MultilayerPerceptronClassifier(labelCol="label", featuresCol="scaledFeatures", layers=mlp_layers, maxIter=100)
}

# Store results
metrics_results = []

# Train models and evaluate
for model_name, model in models.items():
    print(f"Training and evaluating {model_name}...")
    model_trained = model.fit(train_resampled_spark)  # Train on resampled data
    predictions = model_trained.transform(test_transformed)  # Ensure test_transformed is prepared similarly to training data

    metrics = evaluate_model(predictions)
    metrics_results.append((model_name, metrics["AUC"], metrics["Accuracy"], metrics["Precision"], metrics["Recall"], metrics["F1-Score"]))

# Convert to DataFrame and display
metrics_df = spark.createDataFrame(metrics_results, ["Model", "AUC", "Accuracy", "Precision", "Recall", "F1-Score"])
metrics_df.show(truncate=False)


In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="scaledFeatures", weightCol="classWeightCol", maxIter=100)
lr_model = lr.fit(train_weighted)
lr_predictions = lr_model.transform(test_transformed)


In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures", weightCol="classWeightCol")
rf_model = rf.fit(train_weighted)
rf_predictions = rf_model.transform(test_transformed)


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures")
dt_model = dt.fit(train_weighted)
dt_predictions = dt_model.transform(test_transformed)


In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="scaledFeatures", maxIter=100)
gbt_model = gbt.fit(train_weighted)
gbt_predictions = gbt_model.transform(test_transformed)


In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

layers = [train_weighted.select("scaledFeatures").first()["scaledFeatures"].size, 5, 4, 2]  # Adjust hidden layers as needed
mlp = MultilayerPerceptronClassifier(labelCol="label", featuresCol="scaledFeatures", layers=layers, maxIter=100)
mlp_model = mlp.fit(train_weighted)
mlp_predictions = mlp_model.transform(test_transformed)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import expr

# Helper function to calculate accuracy, precision, recall, and F1-score
def calculate_metrics(predictions, label_col="label"):
    tp = predictions.filter((predictions["prediction"] == 1) & (predictions[label_col] == 1)).count()
    tn = predictions.filter((predictions["prediction"] == 0) & (predictions[label_col] == 0)).count()
    fp = predictions.filter((predictions["prediction"] == 1) & (predictions[label_col] == 0)).count()
    fn = predictions.filter((predictions["prediction"] == 0) & (predictions[label_col] == 1)).count()

    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn) if tp + fn > 0 else 0
    f1_score = 2 * ((precision * recall) / (precision + recall)) if precision + recall > 0 else 0

    return accuracy, precision, recall, f1_score

# Evaluate models and store metrics
models = {
    "Logistic Regression": lr_predictions,
    "Random Forest": rf_predictions,
    "Decision Tree": dt_predictions,
    "Gradient Boosting": gbt_predictions,
    "MLP": mlp_predictions
}

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
metrics = []

for model_name, predictions in models.items():
    auc = evaluator.evaluate(predictions)
    accuracy, precision, recall, f1_score = calculate_metrics(predictions)
    metrics.append((model_name, auc, accuracy, precision, recall, f1_score))

# Convert all metrics to float to avoid type mismatch
metrics_float = [(model, float(auc), float(accuracy), float(precision), float(recall), float(f1_score))
                 for model, auc, accuracy, precision, recall, f1_score in metrics]

# Create a DataFrame to show the results in a table
metrics_df = spark.createDataFrame(metrics_float, ["Model", "AUC", "Accuracy", "Precision", "Recall", "F1-Score"])
metrics_df.show(truncate=False)



+-------------------+------------------+------------------+---------------------+------------------+---------------------+
|Model              |AUC               |Accuracy          |Precision            |Recall            |F1-Score             |
+-------------------+------------------+------------------+---------------------+------------------+---------------------+
|Logistic Regression|0.6175871369802802|0.5836405046306196|0.0015189265310543941|0.5687830687830688|0.003029762128443125 |
|Random Forest      |0.6582918068964428|0.6363507069026736|0.0018061587316991641|0.5908289241622575|0.0036013082961780444|
|Decision Tree      |0.5               |0.9988877096824186|0.0                  |0.0               |0.0                  |
|Gradient Boosting  |0.6737377473168198|0.9988877096824186|0.0                  |0.0               |0.0                  |
|MLP                |0.6257362002622799|0.9988877096824186|0.0                  |0.0               |0.0                  |
+---------------

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

categorical_columns = ["genre", "MARITAL_STATUS", "BPR", "AGE_GROUP", "HasCreditYousr", "ALREADY CONNECTED", "CONNECTED RECENTLY"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(master) for column in categorical_columns]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_ohe") for column in categorical_columns]

num_features = ['age', 'CUSTOMER_YEARS', 'SOLDE', 'MVTDEB', 'NBMVTDB', 'MVTCRED', 'NBMVTCRE', 'MONTANTFACTURE',
                'MONTANTVIREMENT', 'MONTANTOPERATIONS', 'NBROPERATION', 'MONTANTGAB', 'NBROPERATIONGAB', 'ACTIVE_RATIO']

encoded_features = [column+"_ohe" for column in categorical_columns]
assembler = VectorAssembler(inputCols=num_features + encoded_features, outputCol='features')

# Optional: StandardScaler to normalize features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

train, test = master.randomSplit([0.7, 0.3], seed=123)

# Fit the pipeline on the training set
pipeline_model = pipeline.fit(train)

# Transform both train and test sets
train = pipeline_model.transform(train).withColumn("label", col("Churn_next_trim")).select("scaledFeatures", "label")
test = pipeline_model.transform(test).withColumn("label", col("Churn_next_trim")).select("scaledFeatures", "label")

class_count = master.groupBy("Churn_next_trim").count().collect()

majority_count = -1
minority_count = float('inf')

for row in class_count:
    count_value = row['count']
    if count_value > majority_count:
        majority_count = count_value
    if count_value < minority_count:
        minority_count = count_value


print("Majority count:", majority_count)
print("Minority count:", minority_count)


train_weighted = train.withColumn("classWeightCol", when(train["label"] == 1, majority_count/minority_count).otherwise(1.0))

# Logistic Regression with class weights
lr = LogisticRegression(labelCol="label", featuresCol="scaledFeatures", weightCol="classWeightCol", maxIter=100)
lr_model = lr.fit(train_weighted)

# Evaluate on test set
predictions = lr_model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

print(f"AUC (Area Under ROC): {auc}")


Majority count: 3388530
Minority count: 3654
AUC (Area Under ROC): 0.617587086757197


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Accuracy: 58.36%
Recall: 58.36%
F1 Score: 73.61%


In [None]:
from pyspark.ml. feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

categorical_columns = ["genre", "MARITAL_STATUS", "BPR", "AGE_GROUP", "HasCreditYousr", "ALREADY CONNECTED",
"CONNECTED RECENTLY"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(master) for column in categorical_columns]

In [None]:
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_ohe") for column in categorical_columns]

num_features = ['age', 'CUSTOMER_YEARS', 'SOLDE', 'MVTDEB', 'NBMVTDB', 'MVTCRED', 'NBMVTCRE',
'MONTANTFACTURE', 'MONTANTVIREMENT', 'MONTANTOPERATIONS', 'NBROPERATION', 'MONTANTGAB',
'NBROPERATIONGAB', 'ACTIVE_RATIO' ]
encoded_features = [column+"_ohe" for column in categorical_columns]

In [None]:
assembler = VectorAssembler(inputCols=num_features + encoded_features, outputCol='features')

In [None]:
pipeline = Pipeline(stages=indexers + encoders + [assembler])

In [None]:
train, test = master.randomSplit([0.7, 0.3], seed=123)

In [None]:
train.count()

2372666

In [None]:
train.show()

+--------+----------+--------+--------------+---+---+--------------+---------+---------+-------+-------+-------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|Idclient| ANNEEMOIS|   genre|MARITAL_STATUS|BPR|age|CUSTOMER_YEARS|AGE_GROUP|    SOLDE| MVTDEB|NBMVTDB|MVTCRED|NBMVTCRE|MONTANTFACTURE|MONTANTVIREMENT|MONTANTOPERATIONS|NBROPERATION|MONTANTGAB|NBROPERATIONGAB|HasCreditYousr|ALREADY CONNECTED|ACTIVE_RATIO|CONNECTED RECENTLY|Churn_next_trim|
+--------+----------+--------+--------------+---+---+--------------+---------+---------+-------+-------+-------+--------+--------------+---------------+-----------------+------------+----------+---------------+--------------+-----------------+------------+------------------+---------------+
|       0|2019-04-01| Féminin|      Marié(e)|  1| 64|            23|    55-64|  1025.33|3452.73|   9.33| 2500.0|     1.0|   

In [None]:
pipeline = Pipeline(stages=indexers + encoders + [assembler])
train = pipeline.fit(train).transform(train).withColumn("label", col("Churn_next_trim")).select("features", "label")
test = pipeline.fit(test).transform(test).withColumn("label", col("Churn_next_trim")).select("features", "label")

In [None]:
class_count = train.groupBy("label").count()
class_count.show()

+-----+-------+
|label|  count|
+-----+-------+
|    1|   2520|
|    0|2370146|
+-----+-------+



In [None]:
#class_count = master.groupBy("Churn_next_trim").count().collect()
#majority_count = max(class_count, key=lambda x: x['count' ]) ['count' ]
#minority_count = min(class_count, key=lambda x: x['count' ]) ['count' ]

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when
from pyspark.ml.evaluation import BinaryClassificationEvaluator

class_count = master.groupBy("Churn_next_trim").count().collect()

majority_count = -1
minority_count = float('inf')

for row in class_count:
    count_value = row['count']
    if count_value > majority_count:
        majority_count = count_value
    if count_value < minority_count:
        minority_count = count_value


print("Majority count:", majority_count)
print("Minority count:", minority_count)

train_weighted = train.withColumn("classWeightCol", when(train["label"] == 1, majority_count/minority_count).otherwise(1.0))

lr = LogisticRegression(labelCol="label", featuresCol="features", weightCol="classWeightCol", maxIter=100)

lr_model = lr.fit(train_weighted)

predictions = lr_model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

auc = evaluator.evaluate(predictions)

print(f"AUC (Area Under ROC): {auc}")

print(f"AUC: {auc}")


Majority count: 3388530
Minority count: 3654
AUC (Area Under ROC): 0.6175785813048541
AUC: 0.6175785813048541


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Accuracy: 58.36%
Recall: 58.36%
F1 Score: 73.61%


In [None]:
# Create a DecisionTreeClassifier object
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", weightCol="classWeightCol")
# Fit the model to the training data
dt_model = dt.fit(train_weighted)
# Make predictions on the test data
predictions = dt_model.transform(test)

In [None]:


evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

auc = evaluator.evaluate(predictions)

print(f"AUC (Area Under ROC): {auc}")

print(f"AUC: {auc}")


AUC (Area Under ROC): 0.5837954631992539
AUC: 0.5837954631992539


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Accuracy: 63.31%
Recall: 63.31%
F1 Score: 77.43%


In [None]:
# Create a Gradient Boosted Trees classifier object
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", weightCol="classWeightCol", maxIter=100 , maxDepth=5, seed=42)
# Fit the model to the training data
gbt_model=gbt.fit(train_weighted)
# Make predictions on the test data
predictions=gbt_model. transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

auc = evaluator.evaluate(predictions)

print(f"AUC (Area Under ROC): {auc}")

print(f"AUC: {auc}")
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
# Define the Layers for the neural network
featuresCol="features"
layers = [len(featuresCol), 10, 5, 2]
# Create a MultilayerPerceptronClassifier object
mlp = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", maxIter=100, layers=layers)
# Fit the model to the training data
mlp_model = mlp.fit(train_weighted)
# Make predictions on the test data
predictions = mlp_model.transform(test)

In [None]:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Py4JJavaError: An error occurred while calling o3501.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1660.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1660.0 (TID 4807) (86ea77bb676d executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`ProbabilisticClassificationModel$$Lambda$4565/0x0000000841815840`: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:42)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
	at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:337)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:279)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel.$anonfun$transform$2(ProbabilisticClassifier.scala:121)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:78)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:76)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy$lzycompute(MulticlassMetrics.scala:188)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy(MulticlassMetrics.scala:188)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:153)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`ProbabilisticClassificationModel$$Lambda$4565/0x0000000841815840`: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:42)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
	at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:337)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:279)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel.$anonfun$transform$2(ProbabilisticClassifier.scala:121)
	... 22 more


In [None]:
#Count the number of records for each class in the DataFrame using the groupBy and count functions.
class_count = master.groupBy('Churn_next_trim').count().collect()
#Determine the class with the smallest number of records.
smallest_class_count = min(class_count, key=lambda x: x['count' ]) ['count' ]
# Calculate the number of records needed for each class to achieve balance
target_count = min(class_count, key=lambda x: x['count']) ['count' ]
# Calculate the sampling fractions for oversampling the minority class
sampling_fractions = {k: min(target_count / v, 1.0) for k, v in dict(class_count).items() if v < target_count}
# Oversample the minority class
oversampled_df = master. sampleBy('Churn_next_trim', fractions=sampling_fractions, seed=42)
# Calculate the sampling fractions for undersampling the majority clas
sampling_fractions = {k: 1.0 if v == smallest_class_count else target_count / v for k, v in dict(class_count).items()}
# Undersample the majority class
undersampled_df = master.sampleBy('Churn_next_trim', fractions=sampling_fractions, seed=42)
balanced_df = oversampled_df.unionAll(undersampled_df)

In [None]:
# Compter le nombre d'enregistrements pour chaque classe dans le DataFrame équilibré
final_class_count = balanced_df.groupBy('Churn_next_trim').count()

# Afficher le résultat
final_class_count.show()


+---------------+-----+
|Churn_next_trim|count|
+---------------+-----+
|              1| 3654|
|              0| 3707|
+---------------+-----+



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Define the model
lr= LogisticRegression(featuresCol="features", labelCol="label")
# Train the model
lr_model = lr.fit(train)
# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
predictions = lr_model.transform(test)
LRauc = evaluator.evaluate(predictions)
print("AUC: {:.2f}%".format(LRauc * 100))

AUC: 61.15%


In [None]:

# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Accuracy: 58.34%
Recall: 58.34%
F1 Score: 58.34%


In [None]:
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
print("Precision: {:.2f}%".format(precision * 100))

Precision: 58.35%


In [None]:
LRauc = evaluator.evaluate(predictions)
print("AUC: {:.2f}%".format(LRauc * 100))

AUC: 54.13%


In [None]:
# Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Accuracy: 99.89%
Recall: 99.89%
F1 Score: 99.83%


In [None]:
LRauc = evaluator.evaluate(predictions)
print("AUC: {:.2f}%".format(LRauc * 100))

AUC: 62.92%


In [None]:
#Utiliser MulticlassClassificationEvaluator pour d'autres métriques
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Calculer la précision
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculer le rappel
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
print("Recall: {:.2f}%".format(recall * 100))

# Calculer le F1-score
f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print("F1 Score: {:.2f}%".format(f1_score * 100))

Accuracy: 59.43%
Recall: 59.43%
F1 Score: 59.39%


In [None]:
# Vérifier si certaines lignes contiennent des valeurs nulles
train.filter(train.features.isNull() | train.label.isNull()).show()

# Optionnel : supprimer les lignes avec des valeurs nulles
train_clean = train.na.drop()
test_clean = test.na.drop()


+--------+-----+
|features|label|
+--------+-----+
+--------+-----+



In [None]:
train.count()
test.count()


2201