In [26]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [27]:
# Créer une session Spark
spark = SparkSession.builder \
    .appName("Churn Prediction") \
    .getOrCreate()


In [28]:
# Charger les données
train_data = spark.read.csv("C:/Users/khoul/Dropbox/PC/Downloads/kafka-spark-streaming-integration-master/kafka-spark-streaming-integration-master/kafka/src/main/resources/churn-bigml-80.csv", header=True, inferSchema=True)
test_data = spark.read.csv("C:/Users/khoul/Dropbox/PC/Downloads/kafka-spark-streaming-integration-master/kafka-spark-streaming-integration-master/kafka/src/main/resources/churn-bigml-20.csv", header=True, inferSchema=True)


In [29]:
# Convertir la colonne Churn en string
train_data = train_data.withColumn("Churn", train_data["Churn"].cast("string"))
test_data = test_data.withColumn("Churn", test_data["Churn"].cast("string"))


In [30]:

# Convertir la colonne Churn en numérique
indexer = StringIndexer(inputCol="Churn", outputCol="label")
train_data = indexer.fit(train_data).transform(train_data)
test_data = indexer.fit(test_data).transform(test_data)


In [31]:

# Convertir les variables catégorielles en numériques
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(train_data) for column in ["State", "International plan", "Voice mail plan"]]
for indexer in indexers:
    train_data = indexer.transform(train_data)
    test_data = indexer.transform(test_data)

In [32]:
# Créer un vecteur de features
feature_columns = ["Account length", "Area code", "Number vmail messages", "Total day minutes", "Total day calls",
                   "Total day charge", "Total eve minutes", "Total eve calls", "Total eve charge", "Total night minutes",
                   "Total night calls", "Total night charge", "Total intl minutes", "Total intl calls", "Total intl charge",
                   "Customer service calls", "State_index", "International plan_index", "Voice mail plan_index"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

In [33]:

# Entraîner le modèle GBT avec maxBins augmenté
gbt = GBTClassifier(featuresCol="features", labelCol="label", maxBins=60)
gbt_model = gbt.fit(train_data)


In [34]:

# Faire des prédictions sur les données de test
predictions = gbt_model.transform(test_data)


In [12]:

# Évaluer les performances du modèle
evaluator = BinaryClassificationEvaluator(labelCol="label")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.9003220463746787


In [15]:
spark.stop()

In [32]:
print("Accuracy:", accuracy)

Accuracy: 0.9003220463746787


In [33]:
predictions 

DataFrame[State: string, Account length: int, Area code: int, International plan: string, Voice mail plan: string, Number vmail messages: int, Total day minutes: double, Total day calls: int, Total day charge: double, Total eve minutes: double, Total eve calls: int, Total eve charge: double, Total night minutes: double, Total night calls: int, Total night charge: double, Total intl minutes: double, Total intl calls: int, Total intl charge: double, Customer service calls: int, Churn: string, label: double, State_index: double, International plan_index: double, Voice mail plan_index: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [35]:
# Save the model

gbt_model.write().overwrite().save("modelnew.pkl")  # Replace "model99" with your desired filename



In [36]:
from pyspark.ml.classification import GBTClassificationModel

# Load the model
loaded_model = GBTClassificationModel.load("modelnew.pkl")

# Suppose 'test_data' is your DataFrame containing the data to predict
predictions = loaded_model.transform(test_data)
# Show the predictions
predictions.show()

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+-----+-----------+------------------------+---------------------+--------------------+--------------------+--------------------+----------+
|State|Account length|Area code|International plan|Voice mail plan|Number vmail messages|Total day minutes|Total day calls|Total day charge|Total eve minutes|Total eve calls|Total eve charge|Total night minutes|Total night calls|Total night charge|Total intl minutes|Total intl calls|Total intl charge|Customer service calls|Churn|label|State_index|International plan_index|Voice mail plan_index|            features|       rawPrediction|         probability|prediction|
+-----+--------------+---------+------------------+-------

In [37]:
# Show predictions as a table
predictions.select("features", "prediction").show()


+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[117.0,408.0,0.0,...|       0.0|
|[65.0,415.0,0.0,1...|       1.0|
|[161.0,415.0,0.0,...|       0.0|
|[111.0,415.0,0.0,...|       0.0|
|[49.0,510.0,0.0,1...|       0.0|
|[36.0,408.0,30.0,...|       0.0|
|[65.0,415.0,0.0,2...|       0.0|
|[119.0,415.0,0.0,...|       1.0|
|[10.0,408.0,0.0,1...|       0.0|
|[68.0,415.0,0.0,1...|       0.0|
|[74.0,510.0,33.0,...|       0.0|
|[85.0,415.0,0.0,2...|       0.0|
|[46.0,415.0,0.0,2...|       0.0|
|[128.0,510.0,29.0...|       0.0|
|[155.0,415.0,0.0,...|       0.0|
|[73.0,415.0,0.0,1...|       0.0|
|[77.0,415.0,0.0,2...|       0.0|
|[108.0,415.0,0.0,...|       0.0|
|[95.0,408.0,0.0,1...|       0.0|
|[36.0,510.0,29.0,...|       0.0|
+--------------------+----------+
only showing top 20 rows



In [73]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Chemin du fichier CSV
file_path = "C:/Users/khoul/Dropbox/PC/Downloads/kafka-spark-streaming-integration-master/kafka-spark-streaming-integration-master/spark/Master_dataset.csv"

# Lire le CSV sans spécifier le schéma
Newdata = spark.read.csv(file_path, header=False, inferSchema=True)

# Afficher les premières lignes
Newdata.show(5)

+-----+--------------+---------+------------------+---------------+--------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+--------------------+-------+----+----+----+----+----+-----+----+-----+-----+----+----+-----+----+----+----+----+----+----+-----+
|  _c0|           _c1|      _c2|               _c3|            _c4|                 _c5|              _c6|            _c7|             _c8|              _c9|           _c10|            _c11|               _c12|             _c13|              _c14|              _c15|            _c16|             _c17|                _c18|   _c19|_c20|_c21|_c22|_c23|_c24| _c25|_c26| _c27| _c28|_c29|_c30| _c31|_c32|_c33|_c34|_c35|_c36|_c37| _c38|
+-----+--------------+---------+------------------+---------------+--------------------+-----------------+---------------+----------------

In [71]:
predictions = loaded_model.transform(Newdata)


In [72]:
predictions.select("features", "prediction").show()


Py4JJavaError: An error occurred while calling o2293.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 531.0 failed 1 times, most recent failure: Lost task 0.0 in stage 531.0 (TID 553) (host.docker.internal executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$3588/1696403424`: (struct<Account length_double_VectorAssembler_f319b2928594:double,Area code_double_VectorAssembler_f319b2928594:double,Number vmail messages_double_VectorAssembler_f319b2928594:double,Total day minutes:double,Total day calls_double_VectorAssembler_f319b2928594:double,Total day charge:double,Total eve minutes:double,Total eve calls_double_VectorAssembler_f319b2928594:double,Total eve charge:double,Total night minutes:double,Total night calls_double_VectorAssembler_f319b2928594:double,Total night charge:double,Total intl minutes:double,Total intl calls_double_VectorAssembler_f319b2928594:double,Total intl charge:double,Customer service calls_double_VectorAssembler_f319b2928594:double,State_index:double,International plan_index:double,Voice mail plan_index:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.GeneratedMethodAccessor242.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$3588/1696403424`: (struct<Account length_double_VectorAssembler_f319b2928594:double,Area code_double_VectorAssembler_f319b2928594:double,Number vmail messages_double_VectorAssembler_f319b2928594:double,Total day minutes:double,Total day calls_double_VectorAssembler_f319b2928594:double,Total day charge:double,Total eve minutes:double,Total eve calls_double_VectorAssembler_f319b2928594:double,Total eve charge:double,Total night minutes:double,Total night calls_double_VectorAssembler_f319b2928594:double,Total night charge:double,Total intl minutes:double,Total intl calls_double_VectorAssembler_f319b2928594:double,Total intl charge:double,Customer service calls_double_VectorAssembler_f319b2928594:double,State_index:double,International plan_index:double,Voice mail plan_index:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 20 more
