In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=fdc394050f375404e79fed76afdc647f3a315f5f17456ef51ab76f58fb6d8140
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('churn').getOrCreate()
df=spark.read.csv('customer_churn.csv',inferSchema=True,header=True)


In [4]:
df.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [6]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
df.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                NULL|                NULL|0.16666666666666666|
| stddev|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [8]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [10]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [51]:
from pyspark.ml.feature import VectorAssembler

In [52]:
assembler=VectorAssembler(inputCols=['Age','Total_Purchase','Account_Manager',
                                     'Years','Num_Sites'],outputCol='features')

In [53]:
output=assembler.transform(df)

In [55]:
final_data=output.select('features','churn')

In [56]:
train_data,test_data=final_data.randomSplit([0.7,0.3])

In [57]:
train_data.describe().show()

+-------+-------------------+
|summary|              churn|
+-------+-------------------+
|  count|                641|
|   mean|0.17004680187207488|
| stddev|0.37596729120518246|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [58]:
test_data.describe().show()

+-------+-------------------+
|summary|              churn|
+-------+-------------------+
|  count|                259|
|   mean| 0.1583011583011583|
| stddev|0.36572987717804617|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [59]:
from pyspark.ml.classification import LogisticRegression
lr=LogisticRegression(labelCol='churn')

In [60]:
fitted_churn=lr.fit(train_data)
training_sum=fitted_churn.summary
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                641|                641|
|   mean|0.17004680187207488|0.12792511700468018|
| stddev|0.37596729120518246| 0.3342672499419818|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [61]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [62]:
pred_and_labels=fitted_churn.evaluate(test_data)

In [63]:
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|    0|[4.14940998503883...|[0.98447122606834...|       0.0|
|[25.0,9672.03,0.0...|    0|[4.34235018251794...|[0.98716105639264...|       0.0|
|[28.0,8670.98,0.0...|    0|[7.58873285558809...|[0.99949413405243...|       0.0|
|[28.0,11204.23,0....|    0|[1.27547636204188...|[0.78167876936123...|       0.0|
|[29.0,5900.78,1.0...|    0|[4.08747743119968...|[0.98349545844370...|       0.0|
|[29.0,9378.24,0.0...|    0|[4.50484652853506...|[0.98906559624103...|       0.0|
|[29.0,9617.59,0.0...|    0|[4.19358263367635...|[0.98513226721302...|       0.0|
|[29.0,13240.01,1....|    0|[6.37734842556226...|[0.99830326015352...|       0.0|
|[30.0,7960.64,1.0...|    1|[2.81833426193711...|[0.94365856942978...|       0.0|
|[30.0,8403.78,1

In [64]:
churn_eval=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='churn')

In [65]:
auc=churn_eval.evaluate(pred_and_labels.predictions)

In [66]:
auc

0.7598456030431864

##Predict on New Data


In [67]:
final_lr_model=lr.fit(final_data)
new_customer=spark.read.csv('new_customers.csv',inferSchema=True,header=True)

In [68]:
new_customer.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [69]:
test_new_customers=assembler.transform(new_customer)

In [70]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [73]:
final_results=final_lr_model.transform(test_new_customers)

In [74]:
final_results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

