In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder,StringIndexer,VectorAssembler
from pyspark.sql.functions import datediff,current_date,year,month,dayofmonth,max
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
spark=SparkSession.builder.appName("logRegProject").getOrCreate()

In [3]:
df=spark.read.csv("customer_churn.csv",inferSchema=True,header=True)

In [4]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
df.head(2)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1),
 Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date=datetime.datetime(2013, 8, 13, 0, 38, 46), Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1)]

In [6]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [7]:
col_list=[ 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn']

In [8]:
df=df[col_list]

In [9]:
df.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [10]:
df.describe().show()

+-------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|              Churn|
+-------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|              900|              900|               900|              900|               900|                900|
|   mean|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|             22.0|            100.0|                 0|              1.0|               3.0|                  0|
|    max|             65.0|         18026.01|                 1|             9.15|              14.0|                  1|
+-------+---------------

In [11]:
max_val=df.agg(max("Total_Purchase")).collect()[0][0]

In [12]:
df=df.withColumn("Total_Purchase_Normalized",df["Total_Purchase"]/(18026.01))

In [13]:
new_col_list=['Age',
  'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn',
 'Total_Purchase_Normalized']

In [14]:
df=df[new_col_list]

In [15]:
df.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Total_Purchase_Normalized: double (nullable = true)



In [16]:
assembler=VectorAssembler(inputCols=['Age','Account_Manager','Years','Num_Sites','Total_Purchase_Normalized'], outputCol="feature_Vector")

In [17]:
df=assembler.transform(df)

In [18]:
df.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Total_Purchase_Normalized: double (nullable = true)
 |-- feature_Vector: vector (nullable = true)



In [19]:
train_data, test_data = df.randomSplit([0.8,0.2])

In [20]:
logReg=LogisticRegression(featuresCol="feature_Vector",labelCol="Churn")

In [21]:
model=logReg.fit(train_data)

# model summary will provide different attributes of the model

In [22]:
model_sum=model.summary

# model.transform will give the test_data dataframe with an addtional column of prediction

In [23]:
test_results=model.transform(test_data)

# model.evaluate will evaluate the model based on test_data and then use the object returned to check diff evaluation metrics

In [24]:
eval=model.evaluate(test_data)

In [25]:
eval.areaUnderROC

0.9292307692307692

In [26]:
test_results.show()

+----+---------------+-----+---------+-----+-------------------------+--------------------+--------------------+--------------------+----------+
| Age|Account_Manager|Years|Num_Sites|Churn|Total_Purchase_Normalized|      feature_Vector|       rawPrediction|         probability|prediction|
+----+---------------+-----+---------+-----+-------------------------+--------------------+--------------------+--------------------+----------+
|30.0|              1| 4.13|      7.0|    0|      0.46620300332685943|[30.0,1.0,4.13,7....|[5.82178895106447...|[0.99704644831882...|       0.0|
|30.0|              1| 5.22|      8.0|    1|       0.6421482069520654|[30.0,1.0,5.22,8....|[3.89712472249539...|[0.98010370243555...|       0.0|
|30.0|              1| 7.16|      9.0|    1|       0.5960353955201401|[30.0,1.0,7.16,9....|[1.63671302345900...|[0.83708718160741...|       0.0|
|31.0|              0|  5.3|      6.0|    0|        0.651460861277676|[31.0,0.0,5.3,6.0...|[6.66156646069944...|[0.99872249303567.

In [27]:
eval=BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="Churn")

In [28]:
eval.evaluate(test_results)

0.7969230769230768