In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when
from pyspark.sql.functions import col, mean, stddev, when, lit

spark= SparkSession.builder.appName('Telecom').getOrCreate()

In [2]:
df=spark.read.csv("C:/dataset/churn-bigml.csv", header=True, inferSchema=True)

In [3]:
df.printSchema()

root
 |-- State: string (nullable = true)
 |-- Account length: integer (nullable = true)
 |-- Area code: integer (nullable = true)
 |-- International plan: string (nullable = true)
 |-- Voice mail plan: string (nullable = true)
 |-- Number vmail messages: integer (nullable = true)
 |-- Total day minutes: double (nullable = true)
 |-- Total day calls: integer (nullable = true)
 |-- Total day charge: double (nullable = true)
 |-- Total eve minutes: double (nullable = true)
 |-- Total eve calls: integer (nullable = true)
 |-- Total eve charge: double (nullable = true)
 |-- Total night minutes: double (nullable = true)
 |-- Total night calls: integer (nullable = true)
 |-- Total night charge: double (nullable = true)
 |-- Total intl minutes: double (nullable = true)
 |-- Total intl calls: integer (nullable = true)
 |-- Total intl charge: double (nullable = true)
 |-- Customer service calls: integer (nullable = true)
 |-- Churn: boolean (nullable = true)



In [4]:
print(df.dtypes)

[('State', 'string'), ('Account length', 'int'), ('Area code', 'int'), ('International plan', 'string'), ('Voice mail plan', 'string'), ('Number vmail messages', 'int'), ('Total day minutes', 'double'), ('Total day calls', 'int'), ('Total day charge', 'double'), ('Total eve minutes', 'double'), ('Total eve calls', 'int'), ('Total eve charge', 'double'), ('Total night minutes', 'double'), ('Total night calls', 'int'), ('Total night charge', 'double'), ('Total intl minutes', 'double'), ('Total intl calls', 'int'), ('Total intl charge', 'double'), ('Customer service calls', 'int'), ('Churn', 'boolean')]


In [5]:
# Sélectionner la colonne "State" et obtenir les éléments distincts
distinct_states = df.select("State").distinct()

# Afficher les éléments distincts
distinct_states.show()

+-----+
|State|
+-----+
|   AZ|
|   SC|
|   LA|
|   MN|
|   NJ|
|   DC|
|   OR|
|   VA|
|   RI|
|   WY|
|   KY|
|   NH|
|   MI|
|   NV|
|   WI|
|   ID|
|   CA|
|   NE|
|   CT|
|   MT|
+-----+
only showing top 20 rows



In [6]:
# Importer les fonctions de Spark SQL
# Créer une expression pour le mapping des états
state_mapping_expr = (
    when(df['State'] == 'KS', 1)
    .when(df['State'] == 'OH', 2)
    .when(df['State'] == 'NJ', 3)
    .when(df['State'] == 'OK', 4)
    .when(df['State'] == 'AL', 5)
    .when(df['State'] == 'MA', 6)
    .when(df['State'] == 'MO', 7)
    .when(df['State'] == 'WV', 8)
    .when(df['State'] == 'RI', 9)
    .when(df['State'] == 'IA', 10)
    .when(df['State'] == 'MT', 11)
    .when(df['State'] == 'ID', 12)
    .when(df['State'] == 'VT', 13)
    .when(df['State'] == 'VA', 14)
    .when(df['State'] == 'TX', 15)
    .when(df['State'] == 'FL', 16)
    .when(df['State'] == 'CO', 17)
    .when(df['State'] == 'AZ', 18)
    .when(df['State'] == 'NE', 19)
    .when(df['State'] == 'WY', 20)
    .when(df['State'] == 'IL', 21)
    .when(df['State'] == 'NH', 22)
    .when(df['State'] == 'LA', 23)
    .when(df['State'] == 'GA', 24)
    .when(df['State'] == 'AK', 25)
    .when(df['State'] == 'MD', 26)
    .when(df['State'] == 'AR', 27)
    .when(df['State'] == 'WI', 28)
    .when(df['State'] == 'OR', 29)
    .when(df['State'] == 'DE', 30)
    .when(df['State'] == 'IN', 31)
    .when(df['State'] == 'UT', 32)
    .when(df['State'] == 'CA', 33)
    .when(df['State'] == 'SD', 34)
    .when(df['State'] == 'NC', 35)
    .when(df['State'] == 'WA', 36)
    .when(df['State'] == 'MN', 37)
    .when(df['State'] == 'NM', 38)
    .when(df['State'] == 'NV', 39)
    .when(df['State'] == 'DC', 40)
    .when(df['State'] == 'NY', 41)
    .when(df['State'] == 'KY', 42)
    .when(df['State'] == 'ME', 43)
    .when(df['State'] == 'MS', 44)
    .when(df['State'] == 'MI', 45)
    .when(df['State'] == 'SC', 46)
    .when(df['State'] == 'TN', 47)
    .when(df['State'] == 'PA', 48)
    .when(df['State'] == 'HI', 49)
    .when(df['State'] == 'ND', 50)
    .when(df['State'] == 'CT', 51)
    .otherwise(df['State'])
)

# Appliquer le mapping à la colonne "State"
df = df.withColumn("State", state_mapping_expr)




In [7]:
# Remplacer les valeurs "Yes" par 1 et "No" par 0 dans la colonne "International plan"
df = df.withColumn("International plan", when(df["International plan"] == "Yes", 1).otherwise(0))



In [8]:
# Remplacer les valeurs "Yes" par 1 et "No" par 0 dans la colonne "International plan"
df = df.withColumn("Voice mail plan", when(df["Voice mail plan"] == "Yes", 1).otherwise(0))

# Afficher le DataFrame
df.head()

Row(State='1', Account length=128, Area code=415, International plan=0, Voice mail plan=1, Number vmail messages=25, Total day minutes=265.1, Total day calls=110, Total day charge=45.07, Total eve minutes=197.4, Total eve calls=99, Total eve charge=16.78, Total night minutes=244.7, Total night calls=91, Total night charge=11.01, Total intl minutes=10.0, Total intl calls=3, Total intl charge=2.7, Customer service calls=1, Churn=False)

In [9]:
# Remplacer les valeurs "true" par 1 et "false" par 0 dans la colonne "International plan"
df = df.withColumn("Churn", when(df["Churn"] == "True", 1).otherwise(0))

# Afficher le DataFrame
df.head()

Row(State='1', Account length=128, Area code=415, International plan=0, Voice mail plan=1, Number vmail messages=25, Total day minutes=265.1, Total day calls=110, Total day charge=45.07, Total eve minutes=197.4, Total eve calls=99, Total eve charge=16.78, Total night minutes=244.7, Total night calls=91, Total night charge=11.01, Total intl minutes=10.0, Total intl calls=3, Total intl charge=2.7, Customer service calls=1, Churn=0)

In [10]:
# Nombre de lignes avant la suppression des lignes vides
nbr_lignes_avant = df.count()

# Supprimer les lignes vides
df = df.na.drop()

# Nombre de lignes après la suppression des lignes vides
nbr_lignes_apres = df.count()

# Afficher le nombre de lignes avant et après
print("Nombre de lignes avant la suppression des lignes vides : ", nbr_lignes_avant)
print("Nombre de lignes après la suppression des lignes vides : ", nbr_lignes_apres)


Nombre de lignes avant la suppression des lignes vides :  3333
Nombre de lignes après la suppression des lignes vides :  3333


In [11]:
# Nombre de lignes avant la suppression des lignes dupliquées
nbr_lignes_avant = df.count()

# Supprimer les lignes dupliquées
df = df.dropDuplicates()

# Nombre de lignes après la suppression des lignes dupliquées
nbr_lignes_apres = df.count()

# Afficher le nombre de lignes avant et après
print("Nombre de lignes avant la suppression des lignes dupliquées : ", nbr_lignes_avant)
print("Nombre de lignes après la suppression des lignes dupliquées : ", nbr_lignes_apres)


Nombre de lignes avant la suppression des lignes dupliquées :  3333
Nombre de lignes après la suppression des lignes dupliquées :  3333


In [12]:
# Définir les colonnes à traiter
columns_to_process = ["Account length","Area code","Number vmail messages","Total day minutes","Total day calls","Total day charge", "Total eve minutes", "Total eve calls", "Total eve charge", 
                      "Total eve calls","Total eve charge","Total night minutes", "Total night calls", "Total night charge", "Total intl minutes", 
                      "Total intl calls", "Total intl charge", "Customer service calls"]

for column in columns_to_process:
    # Calculer la moyenne et l'écart-type pour la colonne actuelle
    stats = df.select(mean(col(column)).alias(f"{column}_mean"), stddev(col(column)).alias(f"{column}_stddev")).collect()[0]

    # Définir le seuil comme 3 fois l'écart-type
    threshold = 3

    # Calculer la moyenne
    column_mean = stats[f"{column}_mean"]

    # Remplacer les valeurs aberrantes dans la colonne actuelle par leur moyenne
    df = df.withColumn(column, when(
        (col(column) >= stats[f"{column}_mean"] - threshold * stats[f"{column}_stddev"]) &
        (col(column) <= stats[f"{column}_mean"] + threshold * stats[f"{column}_stddev"]),
        col(column)
    ).otherwise(lit(column_mean)))

# Afficher le DataFrame avec les valeurs aberrantes remplacées par leur moyenne
df.show()


+-----+------------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|State|    Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|
+-----+------------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------

In [13]:
assembler=VectorAssembler(inputCols=[
'International plan',
'Voice mail plan',
'Number vmail messages',
'Total day minutes',
'Total day calls',
'Total day charge',
'Total eve minutes',
'Total eve calls',
'Total eve charge',
'Total night minutes',
'Total night calls',	
'Total night charge',
'Total intl minutes',	
'Total intl calls',	
'Total intl charge',
],
outputCol="features")

output=assembler.transform(df)
transformed_df= output.select("features","churn")


# Diviser les données en ensembles d'entraînement et de test
(training_data, test_data) = transformed_df.randomSplit([0.8, 0.2], seed=42)

# Initialiser le modèle RandomForestClassifier
rf_model = RandomForestClassifier(featuresCol="features", labelCol="churn", maxBins=52)  

# Entraîner le modèle sur l'ensemble d'entraînement
model = rf_model.fit(training_data)


In [14]:
lrm_summary = model.summary
lrm_summary.predictions.show(2000)

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,0.0,0.0,17.6...|  0.0|[18.4508702123943...|[0.92254351061971...|       0.0|
|[0.0,0.0,0.0,18.9...|  0.0|[17.6385603943468...|[0.88192801971734...|       0.0|
|[0.0,0.0,0.0,34.0...|  0.0|[18.4203341427486...|[0.92101670713743...|       0.0|
|[0.0,0.0,0.0,35.1...|  0.0|[18.3092978760387...|[0.91546489380194...|       0.0|
|[0.0,0.0,0.0,37.8...|  0.0|[17.5584343270347...|[0.87792171635173...|       0.0|
|[0.0,0.0,0.0,40.4...|  0.0|[18.3092978760387...|[0.91546489380194...|       0.0|
|[0.0,0.0,0.0,45.0...|  0.0|[18.4508702123943...|[0.92254351061971...|       0.0|
|[0.0,0.0,0.0,47.4...|  0.0|[18.3440402559508...|[0.91720201279754...|       0.0|
|[0.0,0.0,0.0,48.4...|  0.0|[17.4922616820131...|[0.87461308410065...|       0.0|
|[0.0,0.0,0.0,49

In [15]:
lrm_summary.predictions.describe().show()


+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|               2718|               2718|
|   mean|0.14385577630610744|0.07468727005150846|
| stddev| 0.3510080081884285| 0.2629344358363872|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [16]:
pred_labels = model.evaluate(test_data)
pred_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,0.0,0.0,27.0...|    0|[18.4182813155498...|[0.92091406577749...|       0.0|
|[0.0,0.0,0.0,39.5...|    0|[18.3584698052614...|[0.91792349026307...|       0.0|
|[0.0,0.0,0.0,41.9...|    0|[18.3440402559508...|[0.91720201279754...|       0.0|
|[0.0,0.0,0.0,49.9...|    0|[18.4508702123943...|[0.92254351061971...|       0.0|
|[0.0,0.0,0.0,54.0...|    0|[18.3440402559508...|[0.91720201279754...|       0.0|
|[0.0,0.0,0.0,55.6...|    0|[18.3607099925505...|[0.91803549962753...|       0.0|
|[0.0,0.0,0.0,58.4...|    0|[18.4050209710645...|[0.92025104855322...|       0.0|
|[0.0,0.0,0.0,61.6...|    1|[18.4508702123943...|[0.92254351061971...|       0.0|
|[0.0,0.0,0.0,69.1...|    0|[18.4182813155498...|[0.92091406577749...|       0.0|
|[0.0,0.0,0.0,69

In [17]:
# Évaluer les performances du modèle sur l'ensemble de test
pred_labels = model.evaluate(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="churn", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model.transform(test_data))
print("Accuracy:", accuracy)


Accuracy: 0.926829268292683
