In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.4.0-bin-hadoop2.7/')
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Customer_churn").getOrCreate()

In [3]:
data = spark.read.csv('customer_churn.csv', inferSchema = True, header = True)

In [4]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, OneHotEncoder

In [70]:
string_indexed = StringIndexer(inputCol="Company", outputCol="company_no")

In [71]:
#string_encoded = OneHotEncoder(inputCol="company_no", outputCol ="company_vec")

In [72]:
df = string_indexed.fit(data).transform(data)

In [73]:
#new_df = string_encoded.transform(df)

In [74]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- company_no: double (nullable = false)



In [75]:
df.head(1)[0]

Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1, company_no=824.0)

In [76]:
new_df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- company_no: double (nullable = false)
 |-- company_vec: vector (nullable = true)



In [77]:
new_df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'company_no',
 'company_vec']

In [78]:
assembler = VectorAssembler(inputCols=['Age','Total_Purchase','Years',
 'Num_Sites','company_no'], outputCol='features')

In [79]:
final_data = assembler.transform(df)

In [80]:
final_data = final_data.select('features', 'Churn')

In [81]:
from pyspark.ml.classification import LogisticRegression

In [82]:
log_reg_churn = LogisticRegression(labelCol='Churn')

In [83]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [84]:
churn_fit = log_reg_churn.fit(train_data)

In [85]:
churn_fit_test = churn_fit.evaluate(test_data)

In [86]:
churn_fit_test.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,4....|    0|[4.73288215348879...|[0.99127571461247...|       0.0|
|[27.0,8628.8,5.3,...|    0|[5.55551582873327...|[0.99614881524095...|       0.0|
|[28.0,9090.43,5.7...|    0|[1.67160387056344...|[0.84178954160391...|       0.0|
|[29.0,5900.78,5.5...|    0|[4.14395117374301...|[0.98438755269702...|       0.0|
|[29.0,9617.59,5.4...|    0|[4.04480425919663...|[0.98278829845234...|       0.0|
|[29.0,11274.46,4....|    0|[4.4497658415338,...|[0.98845357534662...|       0.0|
|[29.0,12711.15,5....|    0|[4.87930754819991...|[0.99245508213575...|       0.0|
|[29.0,13240.01,4....|    0|[6.61217217057173...|[0.99865789389416...|       0.0|
|[30.0,6744.87,5.1...|    0|[3.25845661149567...|[0.96297580294904...|       0.0|
|[30.0,8874.83,5

In [87]:
new_data = spark.read.csv('new_customers.csv', inferSchema = True, header= True)

In [88]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [89]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')

In [90]:
auc = evaluator.evaluate(churn_fit_test.predictions)

In [91]:
auc

0.8036507936507936

# Prediction on new Data

In [93]:
fitted_churn_model = log_reg_churn.fit(final_data)

In [94]:
new_data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [95]:
string_indexed = StringIndexer(inputCol="Company", outputCol="company_no")

In [96]:
#string_encoded = OneHotEncoder(inputCol="company_no", outputCol="company_vec")

In [97]:
df = string_indexed.fit(new_data).transform(new_data)

In [103]:
df.describe().show()

+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+------------------+
|summary|        Names|               Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|         Company|        company_no|
+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+------------------+
|  count|            6|                 6|                6|                 6|                6|                 6|                   6|               6|                 6|
|   mean|         null|35.166666666666664|7607.156666666667|0.8333333333333334|6.808333333333334|12.333333333333334|                null|            null|               2.5|
| stddev|         null| 15.71517313511584|4346.008232825459| 0.408248290463863|3.708737880555414|3.3862466931200785|              

In [104]:
#new_df = string_encoded.transform(df)

In [105]:
output = assembler.transform(df)

In [106]:
test_customer = churn_fit.transform(output)

In [107]:
test_customer.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- company_no: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [109]:
test_customer.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

