In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('lr_example3').getOrCreate()

In [3]:
data = spark.read.csv('customer_churn.csv', inferSchema=True, header= True)

In [4]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [6]:
from pyspark.ml.feature import VectorAssembler

In [7]:
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'],outputCol='features')

In [8]:
output = assembler.transform(data)

In [9]:
final_data = output.select(['features','churn'])

In [11]:
train_churn , test_churn = final_data.randomSplit([0.7,0.3])

In [12]:
from pyspark.ml.classification import LogisticRegression

In [14]:
lr_churn = LogisticRegression(labelCol='churn')

In [15]:
fitted_churn_model = lr_churn.fit( train_churn)

In [17]:
training_sum = fitted_churn_model.summary

In [22]:
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                628|                628|
|   mean| 0.1767515923566879| 0.1480891719745223|
| stddev|0.38176241442295106|0.35547148869706313|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [21]:
pred_labels = fitted_churn_model.evaluate(test_churn)

In [23]:
pred_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8787.39,1.0...|    1|[1.13995656017805...|[0.75767166331083...|       0.0|
|[28.0,8670.98,0.0...|    0|[8.42778231510716...|[0.99978134184793...|       0.0|
|[28.0,11128.95,1....|    0|[4.61075322253530...|[0.99015359069972...|       0.0|
|[29.0,5900.78,1.0...|    0|[4.52069542172407...|[0.98923567763949...|       0.0|
|[29.0,9378.24,0.0...|    0|[5.31459403835229...|[0.99510480542301...|       0.0|
|[29.0,9617.59,0.0...|    0|[4.95541338832602...|[0.99300411985825...|       0.0|
|[29.0,11274.46,1....|    0|[4.93515180984922...|[0.99286194864237...|       0.0|
|[29.0,13255.05,1....|    0|[4.54558509568392...|[0.98949751227561...|       0.0|
|[30.0,7960.64,1.0...|    1|[3.67470374873993...|[0.97527015615511...|       0.0|
|[30.0,8403.78,1

In [24]:
churn_eval = BinaryClassificationEvaluator(labelCol='churn',rawPredictionCol='prediction')

In [25]:
auc = churn_eval.evaluate(pred_labels.predictions)

In [26]:
auc

0.7456806426763508

In [27]:
final_model = lr_churn.fit(final_data)

In [29]:
new_customers = spark.read.csv('new_customers.csv', inferSchema=True, header= True)

In [30]:
test_new_cus = assembler.transform(new_customers)

In [31]:
results = final_model.transform(test_new_cus)

In [34]:
results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

