# Customer churn analysis


# Machine Learning Pipeline

In the following notebooks, we will go through the implementation of each one of the steps in the Machine Learning Pipeline. 

We will discuss:

1. Data Preparation and Analysis
2. **Feature Engineering**
3. **Feature Selection**
4. **Model Training**
5. **Obtaining Predictions / Scoring**

In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
import pandas as pd
from sklearn import linear_model
import matplotlib.pyplot as plt
from snowflake.snowpark.functions import udf
%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

# to divide train and test set
from sklearn.model_selection import train_test_split

# feature scaling
from sklearn.preprocessing import MinMaxScaler

# to save the trained scaler class
import joblib

In [2]:
#Snowflake connection info
from config import snowflake_conn_prop
from snowflake.snowpark import version
print(version.VERSION)

session = Session.builder.configs(snowflake_conn_prop).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(0, 7, 0)
[Row(CURRENT_WAREHOUSE()='LAB_S_WH', CURRENT_DATABASE()='SNOWPARK_QUICKSTART', CURRENT_SCHEMA()='TELCO')]


In [3]:
%%time
raw = session.table('TRAIN_DATASET').sample(n = 40000)
data = raw.toPandas()

CPU times: user 781 ms, sys: 73.8 ms, total: 855 ms
Wall time: 3.91 s


In [4]:
data.shape

(40000, 22)

# Separate dataset into train and test

It is important to separate our data intro training and testing set. 

When we engineer features, some techniques learn parameters from data. It is important to learn these parameters only from the train set. This is to avoid over-fitting.

# Snowflake Training

In [5]:
# Create Stage

query = "create stage if not exists models" +\
        " directory = (enable = true)" +\
        " copy_options = (on_error='skip_file')"
        
session.sql(query).collect()

[Row(status='MODELS already exists, statement succeeded.')]

In [6]:
# Create Stored Proc

# setup pipeline

# essential libraries

from snowflake.snowpark.functions import sproc
import snowflake.snowpark
import json
import cachetools
import io
import joblib
import pandas as pd
import numpy as np

#transformations
from sklearn.preprocessing import OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer

#Classifier
from sklearn.ensemble import RandomForestClassifier

#Pipeline
from sklearn.pipeline import make_pipeline
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split

session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')

def save_file(session, model, path):
  input_stream = io.BytesIO()
  joblib.dump(model, input_stream)
  session._conn._cursor.upload_stream(input_stream, path)
  return "successfully created file: " + path


def train_model(session: snowflake.snowpark.Session) -> float:
    snowdf = session.table("TRAIN_DATASET")
    # split the train and test set
    snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82) # use seed to make the split repeatable
    

    # save the train and test sets as time stamped tables in Snowflake 
    snowdf_train.write.mode("overwrite").save_as_table("CHURN_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("CHURN_TEST")
    
       
    churn_train = snowdf_train.drop("CHURNVALUE","CUSTOMERID").to_pandas() # drop labels for training set
    churn_train_labels = snowdf_train.select("CHURNVALUE").to_pandas()
    
    churn_test = snowdf_test.drop("CHURNVALUE","CUSTOMERID").to_pandas()
    churn_test_labels = snowdf_test.select("CHURNVALUE").to_pandas()
    
    cat_vars = ['GENDER', 'SENIORCITIZEN', 'PARTNER', 'DEPENDENTS', 'PHONESERVICE', 'MULTIPLELINES', 'INTERNETSERVICE',
            'ONLINESECURITY', 'ONLINEBACKUP', 'DEVICEPROTECTION', 'TECHSUPPORT', 'STREAMINGTV', 'STREAMINGMOVIES',
            'CONTRACT', 'PAPERLESSBILLING', 'PAYMENTMETHOD']

    # we will capture those of type numerical from previous notebook
    num_vars = [ 'TENUREMONTHS', 'MONTHLYCHARGES', 'TOTALCHARGES']
    
    
    # Model Pipeline
    num_pipe = Pipeline([
            ('imputer', SimpleImputer(missing_values=np.nan, strategy='constant', fill_value=0)),
            ('scaler', MinMaxScaler()),
        ])


    preprocessor = ColumnTransformer([
            ('num_transform', num_pipe, num_vars),
            ('ordinalEncoding',OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), cat_vars)
        ])
    
    clf = make_pipeline(RandomForestClassifier(random_state=0, n_jobs=-1))
    
    model = Pipeline([
            ('preprocessor', preprocessor),
            ('model', clf),
        ])

    # fit the model
    model.fit(churn_train, churn_train_labels)
    
    # save the full pipeline including the model
    save_file(session, model, "@MODELS/churn_model_reg.joblib")

    # predict on the test set and return the root mean squared error (RMSE)
    churn_predictions = model.predict(churn_test)
    lin_mse = mean_squared_error(churn_test_labels, churn_predictions)
    lin_rmse = np.sqrt(lin_mse)
    return lin_rmse

# Create an instance of StoredProcedure using the sproc() function
train_model_sp = sproc(train_model, replace=True)

In [7]:
train_model_sp()

0.05438601172561986

In [8]:
import sys
import cachetools
import os
from snowflake.snowpark.functions import udf
session.add_import("@MODELS/churn_model_reg.joblib")

@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

cat_vars = ['GENDER', 'SENIORCITIZEN', 'PARTNER', 'DEPENDENTS', 'PHONESERVICE', 'MULTIPLELINES', 'INTERNETSERVICE',
            'ONLINESECURITY', 'ONLINEBACKUP', 'DEVICEPROTECTION', 'TECHSUPPORT', 'STREAMINGTV', 'STREAMINGMOVIES',
            'CONTRACT', 'PAPERLESSBILLING', 'PAYMENTMETHOD']

# we will capture those of type numerical from previous notebook
num_vars = [ 'TENUREMONTHS', 'MONTHLYCHARGES', 'TOTALCHARGES']

features = cat_vars + num_vars

@udf(name='predict_churn_sp',is_permanent = True, stage_location = '@MODELSTAGE', replace=True)
def predict_churn_sp(args: list) -> float:
    model = read_file('churn_model_reg.joblib') 
    row = pd.DataFrame([args], columns=features)
    return model.predict(row)

In [9]:
new_df = session.table('TRAIN_DATASET').sample(n = 400)

In [10]:
%%time
new_df.select(new_df.CUSTOMERID,new_df.CHURNVALUE, \
              F.call_udf("predict_churn_sp", F.array_construct(*features)).alias('PREDICTED_CHURN')) \
        .write.mode('overwrite').saveAsTable('churn_detection_sp')


CPU times: user 11.7 ms, sys: 2.4 ms, total: 14.1 ms
Wall time: 9.27 s


In [11]:
session.table('churn_detection_sp').toPandas()

Unnamed: 0,CUSTOMERID,CHURNVALUE,PREDICTED_CHURN
0,5387-EOILJ,0.0,0.0
1,1931-vmLou,0.0,0.0
2,2433-8SKsn,0.0,0.0
3,4697-rVNqe,0.0,0.0
4,4896-kMcAF,0.0,0.0
...,...,...,...
395,5993-k1Ed8,0.0,0.0
396,7206-OwbN2,0.0,0.0
397,1978-CFauk,1.0,1.0
398,4926-5sIr4,0.0,0.0


## if we want to do the prediction using SQL

In [12]:
%%time
session.sql(' select customerid,churnvalue, \
            predict_churn_sp(ARRAY_CONSTRUCT( \
                                    GENDER, \
                                    SENIORCITIZEN, \
                                    PARTNER, \
                                    DEPENDENTS, \
                                    PHONESERVICE, \
                                    MULTIPLELINES,  \
                                    INTERNETSERVICE,  \
                                    ONLINESECURITY,  \
                                    ONLINEBACKUP, \
                                    DEVICEPROTECTION,  \
                                    TECHSUPPORT,  \
                                    STREAMINGTV,  \
                                    STREAMINGMOVIES, \
                                    CONTRACT,  \
                                    PAPERLESSBILLING,  \
                                    PAYMENTMETHOD,  \
                                    TENUREMONTHS, \
                                    MONTHLYCHARGES,  \
                                    TOTALCHARGES)) as Churn_prediction \
                                    from train_dataset sample (10 rows)').show()



----------------------------------------------------
|"CUSTOMERID"  |"CHURNVALUE"  |"CHURN_PREDICTION"  |
----------------------------------------------------
|5079-nKQop    |0.0           |0.0                 |
|5173-SV7pB    |1.0           |1.0                 |
|2290-VXBex    |0.0           |0.0                 |
|6609-mN9Pj    |0.0           |0.0                 |
|5686-nS5El    |0.0           |0.0                 |
|8679-XeJg2    |1.0           |1.0                 |
|6501-oLL0P    |0.0           |0.0                 |
|3876-RSSAT    |0.0           |0.0                 |
|3848-Bm4BZ    |0.0           |0.0                 |
|6725-cMHN2    |1.0           |1.0                 |
----------------------------------------------------

CPU times: user 6.03 ms, sys: 1.93 ms, total: 7.96 ms
Wall time: 2.85 s


In [13]:
session.close()