# Big Data Machine Learning Classification with Spark

## Churn Prediction

In this projet data will be read by Spark and  logistic regression and Gbt Classifier will be utilized for predicting customer churn. Ultimately, optimal parameters will be determined for the GBT classifier.

<img src="churn.png" alt="Description of the image">

In [None]:
#pip install pyspark

In [4]:

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import GBTClassifier


# Intialize a saprk session
spark = SparkSession.builder.appName("ChurnPrediction.").getOrCreate()

# Load dataset
data = spark.read.csv("churn.csv",header=True,inferSchema=True)



In [5]:
data.show()

+---+-------------------+----+--------------+---------------+-----+---------+-----+
|_c0|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+-------------------+----+--------------+---------------+-----+---------+-----+
|  0|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|    1|
|  1|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|    1|
|  2|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|    1|
|  3|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|    1|
|  4|     Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0|    1|
|  5|   Jessica Williams|48.0|      10356.02|              0| 5.12|      8.0|    1|
|  6|        Eric Butler|44.0|      11331.58|              1| 5.23|     11.0|    1|
|  7|      Zachary Walsh|32.0|       9885.12|              1| 6.92|      9.0|    1|
|  8|        Ashlee Carr|43.0|       14062.6|              1| 5.46|     11.0

In [6]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
from pyspark.sql.functions import sum as spark_sum

# Check for null values in each column
null_counts = data.select([spark_sum(data[col].isNull().cast("int")).alias(col) for col in data.columns])

# Display the counts of null values in each column
null_counts.show()

+---+-----+---+--------------+---------------+-----+---------+-----+
|_c0|Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+-----+---+--------------+---------------+-----+---------+-----+
|  0|    0|  0|             0|              0|    0|        0|    0|
+---+-----+---+--------------+---------------+-----+---------+-----+



In [8]:
#Split data into traning an test
train_data,test_data=data.randomSplit([0.8,0.2],seed=123)

In [9]:
#define the feature name
feature_cols=["Age","Total_Purchase","Account_Manager","Years","Num_Sites"]
label_col="Churn"



### LogisticRegression

In [10]:
#Assemble the feature vector
assembler=VectorAssembler(inputCols=feature_cols,outputCol="features")
train_data=assembler.transform(train_data.select(*feature_cols,label_col))
test_data=assembler.transform(test_data.select(*feature_cols,label_col))

In [11]:
#create and train logistic Regression
lr=LogisticRegression(labelCol=label_col,featuresCol="features")
model=lr.fit(train_data)

In [12]:
#make predictions
predictions=model.transform(test_data)

In [13]:
#evaluate the model
evaluater=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol=label_col)
accuracy=evaluater.evaluate(predictions)
print("Accuracy", accuracy)

Accuracy 0.9110597140454166


### GBTClassifier


In [14]:
feature_cols=["Age","Total_Purchase","Account_Manager","Years","Num_Sites"]
label_col="Churn"

In [15]:
assembler=VectorAssembler(inputCols=feature_cols,outputCol="features")
data=assembler.transform(data.select(*feature_cols,label_col))

In [16]:
data.show()

+----+--------------+---------------+-----+---------+-----+--------------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|            features|
+----+--------------+---------------+-----+---------+-----+--------------------+
|42.0|       11066.8|              0| 7.22|      8.0|    1|[42.0,11066.8,0.0...|
|41.0|      11916.22|              0|  6.5|     11.0|    1|[41.0,11916.22,0....|
|38.0|      12884.75|              0| 6.67|     12.0|    1|[38.0,12884.75,0....|
|42.0|       8010.76|              0| 6.71|     10.0|    1|[42.0,8010.76,0.0...|
|37.0|       9191.58|              0| 5.56|      9.0|    1|[37.0,9191.58,0.0...|
|48.0|      10356.02|              0| 5.12|      8.0|    1|[48.0,10356.02,0....|
|44.0|      11331.58|              1| 5.23|     11.0|    1|[44.0,11331.58,1....|
|32.0|       9885.12|              1| 6.92|      9.0|    1|[32.0,9885.12,1.0...|
|43.0|       14062.6|              1| 5.46|     11.0|    1|[43.0,14062.6,1.0...|
|40.0|       8066.94|       

In [17]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

In [18]:
# Initialize and train GBTClassifier model
gbt=GBTClassifier(labelCol=label_col,featuresCol="features")
model=gbt.fit(train_data)

In [19]:
# Make predictions
predictions = model.transform(test_data)

In [20]:
#spark.stop()

In [21]:
evaluater=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol=label_col)
accuracy=evaluater.evaluate(predictions)
print("Accuracy", accuracy)
#spark.stop()

Accuracy 0.8431148373983745


In [22]:
train_data.show()

+----+--------------+---------------+-----+---------+-----+--------------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|            features|
+----+--------------+---------------+-----+---------+-----+--------------------+
|22.0|      11254.38|              1| 4.96|      8.0|    0|[22.0,11254.38,1....|
|25.0|       9672.03|              0| 5.49|      8.0|    0|[25.0,9672.03,0.0...|
|26.0|       8939.61|              0| 4.54|      7.0|    0|[26.0,8939.61,0.0...|
|27.0|        8628.8|              1|  5.3|      7.0|    0|[27.0,8628.8,1.0,...|
|28.0|       8670.98|              0| 3.99|      6.0|    0|[28.0,8670.98,0.0...|
|28.0|      11128.95|              1| 5.12|      8.0|    0|[28.0,11128.95,1....|
|29.0|       5900.78|              1| 5.56|      8.0|    0|[29.0,5900.78,1.0...|
|29.0|       8688.17|              1|  5.7|      9.0|    1|[29.0,8688.17,1.0...|
|29.0|       9378.24|              0| 4.93|      8.0|    0|[29.0,9378.24,0.0...|
|29.0|      12711.15|       

In [23]:
train_data.show()

+----+--------------+---------------+-----+---------+-----+--------------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|            features|
+----+--------------+---------------+-----+---------+-----+--------------------+
|22.0|      11254.38|              1| 4.96|      8.0|    0|[22.0,11254.38,1....|
|25.0|       9672.03|              0| 5.49|      8.0|    0|[25.0,9672.03,0.0...|
|26.0|       8939.61|              0| 4.54|      7.0|    0|[26.0,8939.61,0.0...|
|27.0|        8628.8|              1|  5.3|      7.0|    0|[27.0,8628.8,1.0,...|
|28.0|       8670.98|              0| 3.99|      6.0|    0|[28.0,8670.98,0.0...|
|28.0|      11128.95|              1| 5.12|      8.0|    0|[28.0,11128.95,1....|
|29.0|       5900.78|              1| 5.56|      8.0|    0|[29.0,5900.78,1.0...|
|29.0|       8688.17|              1|  5.7|      9.0|    1|[29.0,8688.17,1.0...|
|29.0|       9378.24|              0| 4.93|      8.0|    0|[29.0,9378.24,0.0...|
|29.0|      12711.15|       

In [24]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize a GBTClassifier model
gbt=GBTClassifier(labelCol=label_col,featuresCol="features")

# Define a parameter grid to search through
param_grid = (ParamGridBuilder()
              .addGrid(gbt.maxDepth, [2, 10])
              .addGrid(gbt.maxIter, [3, 20])
              .build())

# Define an evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=label_col)

# Initialize CrossValidator
cv = CrossValidator(estimator=gbt,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=3)

# Fit CrossValidator to the training data
cv_model = cv.fit(train_data)


# Get best GBTClassifier model from cross-validation
best_model = cv_model.bestModel


In [25]:
predictions = best_model.transform(test_data)

In [26]:
evaluater=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol=label_col)
accuracy=evaluater.evaluate(predictions)
print("Accuracy", accuracy)

Accuracy 0.8850990853658538


In [27]:
spark.stop()

## Conclusion