In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.sonic.net/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xzf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [6]:
data = spark.read.csv('/customer_churn.csv',inferSchema=True, header=True)

In [7]:
data.printSchema()


root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [8]:
data.describe().toPandas()

Unnamed: 0,summary,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn
0,count,900,900.0,900.0,900.0,900.0,900.0,900,900,900,900.0
1,mean,,41.81666666666667,10062.82403333334,0.4811111111111111,5.27315555555555,8.587777777777777,,,,0.1666666666666666
2,stddev,,6.127560416916251,2408.644531858096,0.4999208935073339,1.274449013194616,1.7648355920350969,,,,0.3728852122772358
3,min,Aaron King,22.0,100.0,0.0,1.0,3.0,2006-01-02 04:16:13,"00103 Jeffrey Crest Apt. 205 Padillaville, IA ...",Abbott-Thompson,0.0
4,max,Zachary Walsh,65.0,18026.01,1.0,9.15,14.0,2016-12-28 04:07:38,Unit 9800 Box 2878 DPO AA 75157,"Zuniga, Clark and Shaffer",1.0


In [9]:
from pyspark.ml.feature import VectorAssembler

In [10]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [11]:
#Here we choose the columns that we think will be good for our prediction
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'],outputCol='features')
output = assembler.transform(data)

In [12]:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['Churn'], outputCols=['label'], strategy='mean')
imputer_model = imputer.fit(output)
output = imputer_model.transform(output)

In [13]:
output.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|            features|label|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|[42.0,11066.8,0.0...|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|[41.0,11916.22,0....|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|[38.0,12884.75

In [14]:
final_data = output.select('features','churn')

In [15]:
#split the data for training(70%) and test(30%)
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [16]:
final_data.describe().show()
train_data.describe().show()
test_data.describe().show()

+-------+-------------------+
|summary|              churn|
+-------+-------------------+
|  count|                900|
|   mean|0.16666666666666666|
| stddev| 0.3728852122772358|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+-------------------+
|summary|              churn|
+-------+-------------------+
|  count|                634|
|   mean|0.17350157728706625|
| stddev| 0.3789793112070138|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+-------------------+
|summary|              churn|
+-------+-------------------+
|  count|                266|
|   mean|0.15037593984962405|
| stddev| 0.3581133077041032|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [17]:
from pyspark.ml.classification import LogisticRegression

In [18]:
lr_churn = LogisticRegression(labelCol='churn')

In [19]:
fitted_churn_model = lr_churn.fit(train_data)

In [20]:
training_sum = fitted_churn_model.summary

In [21]:
training_sum.predictions.describe().show()

+-------+-------------------+------------------+
|summary|              churn|        prediction|
+-------+-------------------+------------------+
|  count|                634|               634|
|   mean|0.17350157728706625|0.1309148264984227|
| stddev| 0.3789793112070138| 0.337573511742821|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



In [22]:
#importing evaluators from library to check the stadistics of our model --> how well it is performing and its reliability
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [23]:
data_predicted = fitted_churn_model.evaluate(test_data)

In [24]:
data_predicted.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|    0|[4.74634237804022...|[0.99139135451688...|       0.0|
|[28.0,11245.38,0....|    0|[3.62601823165728...|[0.97406837491370...|       0.0|
|[29.0,5900.78,1.0...|    0|[4.47303463097700...|[0.98871614829884...|       0.0|
|[29.0,9378.24,0.0...|    0|[4.72281387801987...|[0.99118821034906...|       0.0|
|[29.0,10203.18,1....|    0|[3.90577075830038...|[0.98027160588186...|       0.0|
|[29.0,13240.01,1....|    0|[6.74188646148326...|[0.99882097350303...|       0.0|
|[29.0,13255.05,1....|    0|[4.11354707130418...|[0.98391333373758...|       0.0|
|[30.0,6744.87,0.0...|    0|[3.57170203559144...|[0.97266048612059...|       0.0|
|[30.0,7960.64,1.0...|    1|[3.29975238488172...|[0.96442031509593...|       0.0|
|[30.0,10744.14,

In [25]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='churn')

In [27]:
#Test Area Under Roc (auc), which is good: approximately 75%
auc = evaluator.evaluate(data_predicted.predictions)
auc

0.7484513274336283