[0.0 Imports](attachment:./#0.0-Imports)<br>
&nbsp;[0.1 Helper Functions](attachment:./#0.1-Helper-Functions)<br>
&nbsp;[0.2 Loading Data](attachment:./#0.2-Loading-Data)<br>

[1.0 Describe Data](attachment:./#1.0-Describe-Data)<br>
&nbsp;[1.1 Rename Columns](attachment:./#1.1-Rename-Columns)<br>
&nbsp;[1.2 Change Data Types](attachment:./#1.2-Change-Data-Types)<br>
&nbsp;[1.3 Descriptive Statistical](attachment:./#1.3-Descriptive-Statistical)<br>
&nbsp;&nbsp;[1.3.1. Numerical Attributes](attachment:./#1.3.1.-Numerical-Attributes)<br>
&nbsp;&nbsp;[1.3.2. Categorical Attributes](attachment:./#1.3.2.-Categorical-Attributes)<br>

[2.0 Feature Engineering](attachment:./#2.0-Feature-Engineering)<br>
&nbsp;[2.1 Annual Revenue](attachment:./#2.1-Annual-Revenue)<br>
&nbsp;[2.2 Revenue per Product](attachment:./#2.2-Revenue-per-Product)<br>

[3.0 Filtering The Features](attachment:./#3.0-Filtering-The-Features)<br>

[4.0 Exploratory Data Analysis?](attachment:./#4.0-Exploratory-Data-Analysis)<br>

[5.0 Machine Learning](attachment:./#5.0-Machine-Learning)<br>
&nbsp;[5.1 Data Preparation](attachment:./#5.1-Data-Preparation)<br>
&nbsp;&nbsp;[5.1.1. Feature Encoding](attachment:./#5.1.1.-Feature-Encoding)<br>
&nbsp;&nbsp;[5.1.2. Vector Assembling](attachment:./#5.1.2.-Vector-Assembling)<br>
&nbsp;&nbsp;[5.1.3. Train Test](attachment:./#5.1.3.-Train-Test)<br>
&nbsp;[5.2 Random Forest Classifier](attachment:./#5.2-Random-Forest-Classifier)<br>
&nbsp;[5.3 Oversampling by Copying](attachment:./#5.3-Oversampling-by-Copying)<br>
&nbsp;[5.4 Retrain Model](attachment:./#5.4-Retrain-Model)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>
&nbsp;[](attachment:./#)<br>



5.1 Data Preparation

## 0.0 Imports

In [40]:
from pyspark.sql import SparkSession
import inflection

import pandas as pd

import pyspark.sql.functions as F

### 0.1 Helper Functions

In [41]:
def describe_data(df):
    described_df = pd.DataFrame(df.dtypes, columns=['column', 'data_type'])

    nan_count = {col:df.filter(df[col].isNull()).count() for col in df.columns}
    described_df['nan_count'] = nan_count.values()

    described_df['nan_percentage'] = (described_df['nan_count']/df.count()) * 100

    described_df.set_index("column", inplace=True)
    return described_df


def describe_to_pandas(df):
    desc = df.describe().toPandas()
    desc.set_index('summary', inplace=True)
    return desc.T


def select_dtypes(df, include=None, exclude=None):
    if include and exclude:
        include = [item for item in include if item not in exclude]
    elif include is None and exclude is None:
        raise ValueError("You have to select by including or excluding values")

    if include:
        return [col for col, col_type in df.dtypes if col_type in include]
    else:
        return [col for col, col_type in df.dtypes if col_type not in exclude]


def count_distinct(df, columns):
    distinct_df = pd.DataFrame()
    for column in columns:
        distinct_count = df.select(column).distinct().count()
        distinct_df[column] = [distinct_count]
    distinct_df.index = ['distinct_count']
    return distinct_df


### 0.2 Loading Data

In [42]:
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
raw_data = spark.read.format("csv").option("header", "true").load("data/churn.csv")
raw_data.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|        0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|        0|            2|        0|             0|       93826.63|

## 1.0 Describe Data

In [43]:
data_frame = raw_data

### 1.1 Rename Columns

In [44]:
snakecase = lambda x: inflection.underscore(x)
data_frame = data_frame.toDF(*map(snakecase, data_frame.columns))

> **Data Dimension. Data Types. Check NaN.**

In [45]:
print(describe_data(data_frame))

                 data_type  nan_count  nan_percentage
column                                               
row_number          string          0             0.0
customer_id         string          0             0.0
surname             string          0             0.0
credit_score        string          0             0.0
geography           string          0             0.0
gender              string          0             0.0
age                 string          0             0.0
tenure              string          0             0.0
balance             string          0             0.0
num_of_products     string          0             0.0
has_cr_card         string          0             0.0
is_active_member    string          0             0.0
estimated_salary    string          0             0.0
exited              string          0             0.0


### 1.2 Change Data Types

In [46]:
data_frame = data_frame.withColumn("row_number", data_frame["row_number"].cast("int"))
data_frame = data_frame.withColumn("customer_id", data_frame["customer_id"].cast("int"))
data_frame = data_frame.withColumn("credit_score", data_frame["credit_score"].cast("float"))
data_frame = data_frame.withColumn("age", data_frame["age"].cast("int"))
data_frame = data_frame.withColumn("tenure", data_frame["tenure"].cast("int"))
data_frame = data_frame.withColumn("balance", data_frame["balance"].cast("float"))
data_frame = data_frame.withColumn("num_of_products", data_frame["num_of_products"].cast("int"))
data_frame = data_frame.withColumn("has_cr_card", data_frame["has_cr_card"].cast("int"))
data_frame = data_frame.withColumn("is_active_member", data_frame["is_active_member"].cast("int"))
data_frame = data_frame.withColumn("estimated_salary", data_frame["estimated_salary"].cast("float"))
data_frame = data_frame.withColumn("exited", data_frame["exited"].cast("int"))
print(describe_data(data_frame))

                 data_type  nan_count  nan_percentage
column                                               
row_number             int          0             0.0
customer_id            int          0             0.0
surname             string          0             0.0
credit_score         float          0             0.0
geography           string          0             0.0
gender              string          0             0.0
age                    int          0             0.0
tenure                 int          0             0.0
balance              float          0             0.0
num_of_products        int          0             0.0
has_cr_card            int          0             0.0
is_active_member       int          0             0.0
estimated_salary     float          0             0.0
exited                 int          0             0.0


### 1.3 Descriptive Statistical

In [47]:
numeric_columns = select_dtypes(data_frame, include=['int', 'float'])
numerical_attributes = data_frame[numeric_columns]
categorial_columns = select_dtypes(data_frame, exclude=['int', 'float'])
categorical_attributes = data_frame[categorial_columns]

#### 1.3.1. Numerical Attributes

From here We already know that

> 20% of customers are in churn. _[mean, exited]_

> Half of customers are active members. _[mean, is_active_member]_

> 70% has credit card. _[mean, has_cr_card]_

And the minimum salary seems too low (we'll check it out later).

In [48]:
desc = numerical_attributes.describe().toPandas()
desc.set_index('summary', inplace=True)
desc.T

summary,count,mean,stddev,min,max
row_number,10000,5000.5,2886.8956799071675,1.0,10000.0
customer_id,10000,15690940.5694,71936.18612274907,15565701.0,15815690.0
credit_score,10000,650.5288,96.65329873613037,350.0,850.0
age,10000,38.9218,10.487806451704587,18.0,92.0
tenure,10000,5.0128,2.892174377049684,0.0,10.0
balance,10000,76485.88928129883,62397.40517924407,0.0,250898.1
num_of_products,10000,1.5302,0.5816543579989917,1.0,4.0
has_cr_card,10000,0.7055,0.4558404644751332,0.0,1.0
is_active_member,10000,0.5151,0.4997969284589181,0.0,1.0
estimated_salary,10000,100090.23985771235,57510.49281035653,11.58,199992.48


### 1.3.2. Categorical Attributes

In [49]:
count_distinct(categorical_attributes, categorical_attributes.columns).T

Unnamed: 0,distinct_count
surname,2932
geography,3
gender,2


## 2.0 Feature Engineering

In [50]:
df = data_frame

### 2.1 Annual Revenue

**_According to the TopBank Analytics team, each customer who has this bank account returns a monetary value of 15% of the value of their estimated salary, if it is less than the average and 20% if this salary is higher than the average, during the current period of your account. This value is calculated annually._**

In [51]:
average_salary = df.select(F.mean('estimated_salary')).collect()[0][0]
TWELVE_MONTHS = 12

In [52]:
def get_annual_revenue(salary):
    if salary > average_salary:
        return (salary/TWELVE_MONTHS)*.2
    else:    
        return (salary/TWELVE_MONTHS)*.15

get_revenue = F.udf(lambda x:get_annual_revenue(x))


df = df.withColumn("annual_revenue", get_revenue(F.col("estimated_salary")).cast('float'))

### 2.2 Revenue per Product

        Would it be "How much is this customer paying for each product?"?
        Not quite, but since we don't have this specific data, let's use 
        it as an alternative.

In [53]:
df = df.withColumn('revenue_per_product', (F.col('annual_revenue')/F.col('num_of_products')).cast('float'))

## 3.0 Filtering The Features

        These columns won't be helpful at all. They're too specific,
        We want to generalize so the solution will work for as much
        observations as we'll have.

In [54]:
df = df.drop('row_number','customer_id','surname')

## 4.0 Exploratory Data Analysis

        Spark Graphs?

## 5.0 Machine Learning

In [55]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metrics

from sklearn.metrics import classification_report

In [56]:
data = df

### 5.1 Data Preparation

#### 5.1.1. Feature Encoding

In [57]:
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_cat").fit(data)
data = gender_indexer.transform(data)

In [58]:
indexer = StringIndexer(inputCol="geography", outputCol="geography_cat").fit(data)
data = indexer.transform(data)

In [59]:
columns_to_drop = ['gender', 'geography']
data = data.drop(*columns_to_drop)

#### 5.1.2. Vector Assembling

In [60]:
features = data.columns
features.remove("exited")
print(features)

['credit_score', 'age', 'tenure', 'balance', 'num_of_products', 'has_cr_card', 'is_active_member', 'estimated_salary', 'annual_revenue', 'revenue_per_product', 'gender_cat', 'geography_cat']


In [61]:
vector_assembler = VectorAssembler(inputCols=features, outputCol='features')
data = vector_assembler.transform(data)

#### 5.1.3. Train Test

In [62]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

## 5.2 Random Forest Classifier

In [63]:
rf = RandomForestClassifier(labelCol="exited", featuresCol="features", numTrees=300)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)

In [64]:
results = transformed.select('probability', 'exited', 'prediction').toPandas()

In [65]:
print(classification_report(results['exited'], results['prediction'], target_names=['not churn', "churn"]))

              precision    recall  f1-score   support

   not churn       0.85      0.98      0.91      2356
       churn       0.82      0.35      0.49       648

    accuracy                           0.84      3004
   macro avg       0.83      0.67      0.70      3004
weighted avg       0.84      0.84      0.82      3004



## 5.3 Oversampling by Copying

In [66]:
describe_to_pandas(data.select(('exited')))

summary,count,mean,stddev,min,max
exited,10000,0.2037,0.4027685839948606,0,1


In [67]:
oversample = data.select(F.col('*')).filter("exited == 1")
data = data.union(oversample).union(oversample)
describe_to_pandas(data.select("exited"))

summary,count,mean,stddev,min,max
exited,14074,0.4342049168679835,0.4956697125500414,0,1


## 5.4 Retrain Model

In [68]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [69]:
rf = RandomForestClassifier(labelCol="exited", featuresCol="features", numTrees=300)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)

In [70]:
results = transformed.select('probability', 'exited', 'prediction').toPandas()
print(classification_report(results['exited'], results['prediction'], target_names=['not churn', "churn"]))

              precision    recall  f1-score   support

   not churn       0.75      0.87      0.81      2440
       churn       0.78      0.63      0.70      1887

    accuracy                           0.76      4327
   macro avg       0.77      0.75      0.75      4327
weighted avg       0.77      0.76      0.76      4327

