# Problem: 

**Can you develop a machine learning model that predict whether a customer leave the company or not?**

- Many clients of a marketing company use its service to produce advertisiment on their websites.

- Our main objective is to build a machine learning model to help predict which customer will stop their services.

**Dataset Story:**

- Consists of 900 observations and 7 variables. 
- Independent variables contain information about customers.
- The dependent variable represents the customer abandonment status.

**Variables:**

- Name : Customer's Name
- Age : Customer's Age
- Total_Purchase : Customer's total purchase
- Account_Manager : Is there an account manager(binary)
- Years : For how many years they have been using it
- Num_sites : How many websites they use
- Churn : Customer's churn situation(binary)

In [22]:
#Loading Libraries

In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, udf, sum as Fsum, from_unixtime, \
    count, countDistinct, when, isnull, max as Fmax, min as Fmin, length, \
    month, datediff, first, year, concat
from pyspark.sql.types import IntegerType, FloatType, DateType, TimestampType, LongType, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier,RandomForestClassifier, LinearSVC, GBTClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler, Normalizer, MinMaxScaler, VectorAssembler, HashingTF
from pyspark.mllib.evaluation import MulticlassMetrics


from pyspark.sql.functions import col,udf, sum as Fsum, from_unixtime,\
count, countDistinct, when, isnull, max as Fmax, min as Fmin, length,\
month, datediff, first, year, concat

from pyspark.sql.types import IntegerType, FloatType, DateType, TimestampType, LongType, StringType

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20211117080902-0002
KERNEL_ID = 92804ef4-4726-4446-8a76-ca2583b60386


# IBM PAK AUTO CONFIGURATION ON THE GIVEN DATA FOR CREATING SPARK DATA FRAME

In [3]:
import ibmos2spark, os
# @hidden_cell

if os.environ.get('RUNTIME_ENV_LOCATION_TYPE') == 'external':
    endpoint_86d27f644da44c0ba645c5c0d4ed5fbe = 'https://s3.eu.cloud-object-storage.appdomain.cloud'
else:
    endpoint_86d27f644da44c0ba645c5c0d4ed5fbe = 'https://s3.private.eu.cloud-object-storage.appdomain.cloud'

credentials = {
    'endpoint': endpoint_86d27f644da44c0ba645c5c0d4ed5fbe,
    'service_id': 'iam-ServiceId-1f4dc0f7-596c-4019-92e2-f57be3e4fa64',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': 'rk-btogQrpYD9zXjgre8A3q1tVr-o_SiSPdjsG4OoSPF'
}

configuration_name = 'os_86d27f644da44c0ba645c5c0d4ed5fbe_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option("header",True) \
  .option("inferSchema",True) \
  .option("sep",",") \
  .option("encoding", "UTF-8") \
  .load(cos.url('churn.csv', 'churnprediction-donotdelete-pr-40w8lqg9gksy9x'))

In [4]:
df.limit(3).toPandas()

Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn
0,Cameron Williams,42.0,11066.8,0,7.22,8.0,1
1,Kevin Mueller,41.0,11916.22,0,6.5,11.0,1
2,Eric Lozano,38.0,12884.75,0,6.67,12.0,1


In [5]:
#Checking Schema of df
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [6]:
# Fixing dtype of 'Num_Sites' from float to integer
df = df.withColumn("Num_Sites", df["Num_Sites"].cast("integer"))

In [7]:
# Creating a new column to find Purchase in terms of Age
df = df.withColumn('purchase_per_site', df.Total_Purchase / df.Num_Sites)

In [8]:
# Creating a new column
df = df.withColumn('segment', when(df['Years'] < 5, "Inexpeirenced").otherwise("Expeirenced"))

# NULL CHECK

In [9]:
# Null check
def null_check(df):
    total_null_count = 0
    for col_name in df.dtypes:
        null_count = df.filter((F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
        if (null_count > 0):
            print("{} {} type has {} null values % {}".format(col_name, col_type[1], null_count,
                                                              (null_count / df_count * 100)))
            total_null_count += null_count
    if total_null_count == 0:
        print("There are no missing values")

In [10]:
null_check(df)

There are no missing values


In [11]:
df.limit(3).toPandas()

Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn,purchase_per_site,segment
0,Cameron Williams,42.0,11066.8,0,7.22,8,1,1383.35,Expeirenced
1,Kevin Mueller,41.0,11916.22,0,6.5,11,1,1083.292727,Expeirenced
2,Eric Lozano,38.0,12884.75,0,6.67,12,1,1073.729167,Expeirenced


In [12]:
# Creating indexer object
indexer = StringIndexer(inputCol="segment", outputCol="segment_label")

In [13]:
# fitting and transforming the object.
indexer.fit(df).transform(df).show(3)

+----------------+----+--------------+---------------+-----+---------+-----+------------------+-----------+-------------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn| purchase_per_site|    segment|segment_label|
+----------------+----+--------------+---------------+-----+---------+-----+------------------+-----------+-------------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|        8|    1|           1383.35|Expeirenced|          0.0|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|       11|    1|1083.2927272727272|Expeirenced|          0.0|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|       12|    1|1073.7291666666667|Expeirenced|          0.0|
+----------------+----+--------------+---------------+-----+---------+-----+------------------+-----------+-------------+
only showing top 3 rows



In [14]:
# temporarly creating temp_df.
temp_sdf = indexer.fit(df).transform(df)

In [15]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: integer (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- purchase_per_site: double (nullable = true)
 |-- segment: string (nullable = false)



In [16]:
# It will temporarily do integer over the temp_sdf.
df = temp_sdf.withColumn("segment_label", temp_sdf["segment_label"].cast("integer"))

In [17]:
# Dropping the column 'segment' 
df = df.drop('segment')

# stringIndexer

In [18]:
df.limit(3).toPandas()

Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Churn,purchase_per_site,segment_label
0,Cameron Williams,42.0,11066.8,0,7.22,8,1,1383.35,0
1,Kevin Mueller,41.0,11916.22,0,6.5,11,1,1083.292727,0
2,Eric Lozano,38.0,12884.75,0,6.67,12,1,1073.729167,0


In [19]:
stringIndexer = StringIndexer(inputCol='Churn', outputCol='label')
temp_sdf = stringIndexer.fit(df).transform(df)
df = temp_sdf.withColumn("label", temp_sdf["label"].cast("integer"))

df.show(5)

+----------------+----+--------------+---------------+-----+---------+-----+------------------+-------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn| purchase_per_site|segment_label|label|
+----------------+----+--------------+---------------+-----+---------+-----+------------------+-------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|        8|    1|           1383.35|            0|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|       11|    1|1083.2927272727272|            0|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|       12|    1|1073.7291666666667|            0|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|       10|    1|           801.076|            0|    1|
|  Cynthia Norton|37.0|       9191.58|              0| 5.56|        9|    1|1021.2866666666666|            0|    1|
+----------------+----+--------------+---------------+-----+---------+--

# Data Preperation for the Model

In [20]:
# we specified independent variables to vectorize them as features in next step
cols = ['Age', 'Total_Purchase', 'Account_Manager', 'Years',
        'Num_Sites', 'purchase_per_site','segment_label']

# VectorAssembler

In [21]:
va = VectorAssembler(inputCols=cols, outputCol="features")
va_df = va.transform(df)
va_df.show(3)

+----------------+----+--------------+---------------+-----+---------+-----+------------------+-------------+-----+--------------------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn| purchase_per_site|segment_label|label|            features|
+----------------+----+--------------+---------------+-----+---------+-----+------------------+-------------+-----+--------------------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|        8|    1|           1383.35|            0|    1|[42.0,11066.8,0.0...|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|       11|    1|1083.2927272727272|            0|    1|[41.0,11916.22,0....|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|       12|    1|1073.7291666666667|            0|    1|[38.0,12884.75,0....|
+----------------+----+--------------+---------------+-----+---------+-----+------------------+-------------+-----+--------------------+
only showing top 3 rows



In [22]:
# Final df
final_df = va_df.select("features", "label")
final_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
+--------------------+-----+
only showing top 5 rows



# Data Splitting

In [23]:
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=12)

In [24]:
# RandomForestClassifier

# create transformers
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')
normalizer = Normalizer(inputCol='scaled_features',outputCol='norm_scaled_features')

# set rf model
rf = RandomForestClassifier(labelCol="label", featuresCol="norm_scaled_features")

# instantiate pipeline
pipeline = Pipeline(stages=[scaler, normalizer, rf])

# train model
model_rf = pipeline.fit(train_data)

# create prediction column on test data
results = model_rf.transform(test_data)

# evaluate results
correct_count = results.filter(results.label == results.prediction).count()
total_count = results.count()

correct_1_count = results.filter((results.label == 1) & (results.prediction == 1)).count()
total_1_test = results.filter((results.label == 1)).count()
total_1_predict = results.filter((results.prediction == 1)).count()

print("All correct predections count: ",correct_count)
print("Total count: ",total_count)
print("Accuracy %: ",(correct_count / total_count)*100)
print("Recall %: ",(correct_1_count / total_1_test)*100)
print("Precision %: ",(correct_1_count / total_1_predict)*100)

All correct predections count:  153
Total count:  184
Accuracy %:  83.15217391304348
Recall %:  21.21212121212121
Precision %:  58.333333333333336
