In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Logistic_regression").getOrCreate()

In [20]:
from cmath import inf
df = spark.read.csv("customer_churn.csv" , inferSchema= True  , header = True)
df.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

In [21]:
df.printSchema()
df.head(2)

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1),
 Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date=datetime.datetime(2013, 8, 13, 0, 38, 46), Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1)]

In [22]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [23]:
from pyspark.sql.functions import year
df.select("Onboard_date").show(5)
df = df.withColumn("year" , year(df["Onboard_date"]))
df.printSchema()

+-------------------+
|       Onboard_date|
+-------------------+
|2013-08-30 07:00:40|
|2013-08-13 00:38:46|
|2016-06-29 06:20:07|
|2014-04-22 12:43:12|
|2016-01-19 15:31:15|
+-------------------+
only showing top 5 rows

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- year: integer (nullable = true)



In [24]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Age", "Total_Purchase" , "Account_Manager" , "Years" , "Num_Sites" , "year"], outputCol="features")

In [26]:
output = assembler.transform(df)
final_data = output.select(["features" , "Churn"])

In [27]:
final_data.show(5)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
+--------------------+-----+
only showing top 5 rows



In [28]:
train_data , test_data = final_data.randomSplit([0.7,0.3])

In [30]:
from pyspark.ml.classification import LogisticRegression
lr_churn = LogisticRegression(labelCol="Churn")
fitted_model = lr_churn.fit(train_data)
train_summary = fitted_model.summary

In [31]:
train_summary.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|  0.0|[4.41756458663307...|[0.98808021885686...|       0.0|
|[25.0,9672.03,0.0...|  0.0|[4.53500179316892...|[0.98938695693285...|       0.0|
|[26.0,8787.39,1.0...|  1.0|[0.46272924411881...|[0.61366142866390...|       0.0|
|[27.0,8628.8,1.0,...|  0.0|[5.40178112943224...|[0.99551169224334...|       0.0|
|[28.0,8670.98,0.0...|  0.0|[7.65214072076072...|[0.99952519940122...|       0.0|
|[28.0,9090.43,1.0...|  0.0|[1.39775890713256...|[0.80182802011358...|       0.0|
|[28.0,11128.95,1....|  0.0|[3.97154418880577...|[0.98150422859599...|       0.0|
|[28.0,11245.38,0....|  0.0|[3.84818869654446...|[0.97912666883077...|       0.0|
|[29.0,5900.78,1.0...|  0.0|[3.90354503236861...|[0.98022851611409...|       0.0|
|[29.0,9378.24,0

In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE
pred_and_lables = fitted_model.evaluate(test_data)
pred_and_lables.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8939.61,0.0...|    0|[6.25394708180209...|[0.99808084076058...|       0.0|
|[28.0,11204.23,0....|    0|[1.70580825620619...|[0.84629180502339...|       0.0|
|[29.0,8688.17,1.0...|    1|[2.62551677299234...|[0.93248585008758...|       0.0|
|[29.0,11274.46,1....|    0|[4.36661604309185...|[0.98746499655391...|       0.0|
|[30.0,8403.78,1.0...|    0|[5.68017787055180...|[0.99659865729124...|       0.0|
|[30.0,8874.83,0.0...|    0|[3.05572556842646...|[0.95502907299097...|       0.0|
|[30.0,12788.37,0....|    0|[2.35588056712384...|[0.91340051272303...|       0.0|
|[31.0,8688.21,0.0...|    0|[6.50725183297714...|[0.99850964852284...|       0.0|
|[31.0,9574.89,0.0...|    0|[3.31734164154108...|[0.96501896314450...|       0.0|
|[31.0,12264.68,

In [33]:
churn_eval = BCE(rawPredictionCol="prediction" , labelCol="Churn")
auc = churn_eval.evaluate(pred_and_lables.predictions)
auc

0.7637630662020906

In [37]:
from types import new_class


final_model = lr_churn.fit(final_data)
new_customer = spark.read.csv("new_customers.csv" , inferSchema=True , header=True)
new_customer = new_customer.withColumn("year" , year(new_customer["Onboard_date"]))
unlabeled_data = assembler.transform(new_customer).select("features")
unlabeled_data.printSchema()

root
 |-- features: vector (nullable = true)



In [38]:
final_result = final_model.transform(unlabeled_data)
final_result.show(5)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[37.0,9935.53,1.0...|[2.22522664409239...|[0.90249210948189...|       0.0|
|[23.0,7526.94,1.0...|[-6.2031132274102...|[0.00201903762162...|       1.0|
|[65.0,100.0,1.0,1...|[-3.8110402065658...|[0.02164622620371...|       1.0|
|[32.0,6487.5,0.0,...|[-5.0580350514018...|[0.00631787118868...|       1.0|
|[32.0,13147.71,1....|[1.12051197718180...|[0.75408367060957...|       0.0|
+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

