In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("logRegProj").getOrCreate()

In [3]:
from pyspark.ml.classification import LogisticRegression

In [4]:
df = spark.read.csv("customer_churn.csv", inferSchema=True, 
                    header=True)

In [5]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [6]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

## Which columns are important for predicting churn? 
#### We are given that the "Account_Manager" column is assigned at random and hence we should not consider it. We can also ignore the  "Names" and "Onboard_date" columns since logically they should not matter. If we think that "Onbard_date" tells us the amount of time the customer is present, we can obtain that information via the "Years" column instead. We can now study if the "Location" column is important. 

### Is "Location" important? How many unique locations? How many total locations? 

In [7]:
location_df = df.select(["Location"]).toPandas()
print("Number of unique locations = {}".format(len(location_df["Location"].unique())))
print("Number of records = {}".format(len(location_df["Location"])))

Number of unique locations = 900
Number of records = 900


### Could the zip code be a more interesting feature? 

#### Extract zipcode using the udf function of pyspark.sql. 

In [8]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def f(my_string):
    return my_string.split()[-1]
udf_my_function = udf(f, StringType())
zipcode_df = df.withColumn("Zipcode", udf_my_function("Location")).select("Zipcode").toPandas()
print("Number of unique locations = {}".format(len(zipcode_df["Zipcode"].unique())))
print("Number of records = {}".format(len(zipcode_df["Zipcode"])))

Number of unique locations = 899
Number of records = 900


### From the above analysis it is clear that the zipcode and/or location is not correlated with churn as almost all locations (whether considering the address or zipcode) are unique. 

### Is "Company" important? How many unique companies? How many total companies? 

In [9]:
from pyspark.sql.functions import col
company_df = df.groupBy("Company").count().sort(col("count").desc())
company_df.show()

+--------------+-----+
|       Company|count|
+--------------+-----+
|Anderson Group|    4|
|    Wilson PLC|    3|
|  Williams PLC|    3|
|     Smith Inc|    2|
|     Ortiz Ltd|    2|
|      King LLC|    2|
|     Evans LLC|    2|
|      Rice PLC|    2|
|    Nelson LLC|    2|
|    Walker Ltd|    2|
|      Soto PLC|    2|
|Davis and Sons|    2|
|     Smith Ltd|    2|
|  Williams LLC|    2|
|      Webb PLC|    2|
|   Smith Group|    2|
|Smith and Sons|    2|
|   Davis Group|    2|
|     Jones LLC|    2|
|     Gates Ltd|    2|
+--------------+-----+
only showing top 20 rows



In [10]:
# Total unique companies
len(company_df.collect())

873

### The above shows that for most data points the "Company" feature is unique and hence less informative. This is a handwavy explanation but we will keep things simpler for now. If we want to consider the company categorical feature, we could use StringIndexer and OneHotEncoder to convert it to a vector. For now, this is commented out. 


### Thus we will consider the columns -- "Age", "Total_Purchase", "Years", "Num_Sites" as our features column and "Churn" as our label column.

In [11]:
my_final_data = df.select(["Age", "Total_Purchase", "Years", "Num_Sites", "Churn"])

In [12]:
my_final_data.show()

+----+--------------+-----+---------+-----+
| Age|Total_Purchase|Years|Num_Sites|Churn|
+----+--------------+-----+---------+-----+
|42.0|       11066.8| 7.22|      8.0|    1|
|41.0|      11916.22|  6.5|     11.0|    1|
|38.0|      12884.75| 6.67|     12.0|    1|
|42.0|       8010.76| 6.71|     10.0|    1|
|37.0|       9191.58| 5.56|      9.0|    1|
|48.0|      10356.02| 5.12|      8.0|    1|
|44.0|      11331.58| 5.23|     11.0|    1|
|32.0|       9885.12| 6.92|      9.0|    1|
|43.0|       14062.6| 5.46|     11.0|    1|
|40.0|       8066.94| 7.11|     11.0|    1|
|30.0|      11575.37| 5.22|      8.0|    1|
|45.0|       8771.02| 6.64|     11.0|    1|
|45.0|       8988.67| 4.84|     11.0|    1|
|40.0|       8283.32|  5.1|     13.0|    1|
|41.0|       6569.87|  4.3|     11.0|    1|
|38.0|      10494.82| 6.81|     12.0|    1|
|45.0|       8213.41| 7.35|     11.0|    1|
|43.0|      11226.88| 8.08|     12.0|    1|
|53.0|       5515.09| 6.85|      8.0|    1|
|46.0|        8046.4| 5.69|     

### If we want to use the "Company" as a feature we can convert the "Company" column to a one-hot encoded vector since it is categorical data. 

In [13]:
from pyspark.ml.feature import (VectorAssembler, StringIndexer, 
                                OneHotEncoder)

In [14]:
# company_indexer = StringIndexer(inputCol="Company", outputCol="CompanyIndex")
# company_encoder = OneHotEncoder(inputCol="CompanyIndex", outputCol="CompanyVec")

### Using assembler and pipeline for model fitting. 

In [15]:
# assembler = VectorAssembler(inputCols=["Age", "Total_Purchase", "Years", 
#                                        "Num_Sites", "CompanyVec"], 
#                             outputCol="features")
assembler = VectorAssembler(inputCols=["Age", "Total_Purchase", "Years", 
                                       "Num_Sites"], 
                            outputCol="features")

In [16]:
from pyspark.ml import Pipeline

In [17]:
log_reg_churn = LogisticRegression(featuresCol="features", labelCol="Churn")

In [18]:
# pipeline = Pipeline(stages=[company_indexer, company_encoder,
#                            assembler, log_reg_churn])
pipeline = Pipeline(stages=[assembler, log_reg_churn])

## Perform train and test split

### First we check if "Churn" column is imbalanced.

In [19]:
my_final_data.groupBy("Churn").count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



### The above data shows that we have most of the datapoints have churn=0. Thus if we split the data into train and test sets randomly, it may happen that all of the churn=1 datapoints remain in one or the other set resulting in a poor test of the trained model. Thus we need to perform stratified splits, i.e, perform 70%-30% split (or any other percentage) per category, i.e., per churn=1 datapoints and separately for churn=0 datapoints. 

### We now perform stratified splits as shown below: 

In [20]:
# fractions = {0: 0.7, 1: 0.7} denotes that for both class=0 and for class=1 
# consider 70% data for the train_data set. 
train_data = my_final_data.sampleBy("Churn", fractions={0: 0.7, 1: 0.7}, seed=11)
# The remaining data goes into test_data set. 
test_data = my_final_data.subtract(train_data)

#### Check if the spliting worked as expected: 

In [21]:
train_data.groupBy("Churn").count().show()
test_data.groupBy("Churn").count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  109|
|    0|  523|
+-----+-----+

+-----+-----+
|Churn|count|
+-----+-----+
|    1|   41|
|    0|  227|
+-----+-----+



## Fit and evaluate model

In [22]:
fitted_model = pipeline.fit(train_data)

In [23]:
results = fitted_model.transform(test_data)

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [25]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Churn")

In [26]:
results.select("Churn", "prediction").show()

+-----+----------+
|Churn|prediction|
+-----+----------+
|    0|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [27]:
AUC = my_eval.evaluate(results)

In [28]:
AUC

0.8072418609648652