In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark

  if obj.__module__ is "__builtin__":


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('churn').getOrCreate()

In [3]:
df = spark.read.csv('customer_churn.csv',inferSchema=True,header=True)

In [4]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
df.head(1)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1)]

In [8]:
df.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [9]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [10]:
from pyspark.ml.feature import VectorAssembler

In [11]:
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase','Account_Manager','Years',
 'Num_Sites'], outputCol='features')

In [12]:
output = assembler.transform(df)

In [13]:
final_data = output.select('features','churn')

In [14]:
train, test = final_data.randomSplit([0.7,0.3])

In [15]:
from pyspark.ml.classification import LogisticRegression

In [16]:
logr_churn = LogisticRegression(labelCol='churn')

In [17]:
fitted_churn_model = logr_churn.fit(train)

In [18]:
training_sum = fitted_churn_model.summary

In [19]:
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                626|                626|
|   mean|0.16613418530351437|0.12300319488817892|
| stddev| 0.3724986866627793| 0.3287035235432933|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [21]:
pred = fitted_churn_model.evaluate(test)

In [22]:
pred.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|    0|[4.28350372129564...|[0.98639344603027...|       0.0|
|[28.0,9090.43,1.0...|    0|[1.39072014398632...|[0.80070718514991...|       0.0|
|[28.0,11245.38,0....|    0|[3.60931333840304...|[0.97364306482495...|       0.0|
|[29.0,5900.78,1.0...|    0|[3.78265410251583...|[0.97774438865570...|       0.0|
|[29.0,13240.01,1....|    0|[6.32640973346145...|[0.99821474957477...|       0.0|
|[30.0,6744.87,0.0...|    0|[3.21718726883486...|[0.96147596624976...|       0.0|
|[30.0,8874.83,0.0...|    0|[2.96730866842096...|[0.95107519812608...|       0.0|
|[30.0,10744.14,1....|    1|[1.72468694671488...|[0.84873156012591...|       0.0|
|[30.0,11575.37,1....|    1|[3.74298158899825...|[0.97686454228145...|       0.0|
|[30.0,12788.37,

In [23]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                           labelCol='churn')

In [25]:
auc = churn_eval.evaluate(pred.predictions)

In [26]:
auc

0.8064454614797864

### predict on new data

In [28]:
final_log_model = logr_churn.fit(final_data)

In [30]:
new_data = spark.read.csv('new_customers.csv',inferSchema=True,header=True)

In [31]:
new_data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [32]:
test_new = assembler.transform(new_data)

In [33]:
test_new.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [34]:
final_results = final_log_model.transform(test_new)

In [35]:
final_results.show()

+--------------+----+--------------+---------------+-----+---------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+----------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|        Onboard_date|            Location|         Company|            features|       rawPrediction|         probability|prediction|
+--------------+----+--------------+---------------+-----+---------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+----------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:...|38612 Johnny Stra...|        King Ltd|[37.0,9935.53,1.0...|[2.22168705251422...|[0.90218018099703...|       0.0|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:...|21083 Nicole Junc...|   Cannon-Benson|[23.0,7526.94,1.0...|[-6.2207530595025...|[0.00198380445828...|  