# Churn Predictive Analytics using Amazon SageMaker and Snowflake

## Setup

Run the cell below to import Python libraries required by this notebook.

The IAM role arn used to give training and hosting access to your data. By default, we'll use the IAM permissions that have been allocated to your notebook instance. The role should have the permissions to access your S3 bucket, and full execution permissions on Amazon SageMaker. In practice, you could minimize the scope of requried permissions.

In [None]:
import boto3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import os
import sys
import time
import json
from IPython.display import display
from time import strftime, gmtime

import sagemaker
from sagemaker import AlgorithmEstimator, get_execution_role
from sagemaker.predictor import RealTimePredictor, csv_serializer, StringDeserializer

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

sess = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name
print("IAM role ARN: {}".format(role))

Now let's set the S3 bucket and prefix that you want to use for training and model data. This bucket should be created within the same region as the Notebook Instance, training, and hosting. 

- Replace <<'REPLACE WITH YOUR BUCKET NAME'>> with the name of your bucket.

In [None]:
bucket = 'snowflake-sagemaker-workshop'
prefix = 'churn-analytics-lab'

---
## Data

Mobile operators have historical records on which customers ultimately ended up churning and which continued using the service. We can use this historical information to construct an ML model of one mobile operator’s churn using a process called training. After training the model, we can pass the profile information of an arbitrary customer (the same profile information that we used to train the model) to the model, and have the model predict whether this customer is going to churn. Of course, we expect the model to make mistakes–after all, predicting the future is tricky business! But I’ll also show how to deal with prediction errors.

The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets.  In the previous steps, this dataset was loaded into the CUSTOMER_CHURN table in your Snowflake instance.

Provide the connection and credentials required to connect to your Snowflake account. You'll need to modify the cell below with the appropriate **ACCOUNT** for your Snowflake trial. If you followed the lab guide instructions, the username and password below will work. 

**NOTE:** For Snowflake accounts in regions other than US WEST add the Region ID after a period <ACCOUNT>.<REGION ID> i.e. XYZ123456.US-EAST-1. 

In practice, security standards might prohibit you from providing credentials in clear text. As a best practice in production, you should utilize a service like [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) to manage your database credentials.

In [None]:
import snowflake.connector
# Connecting to Snowflake using the default authenticator
ctx = snowflake.connector.connect(
  user='sagemaker',
  password='AWSSF123',
  account='<ACCOUNT>',
  warehouse='SAGEMAKER_WH',
  database='ML_WORKSHOP',
  schema='PUBLIC'
)

---
## Explore

Now we can run queries against your database. 

However, in practice, the data table will often contain more data than what is practical to operate on within a notebook instance, or relevant attributes are spread across multiple tables. Being able to run SQL queries and loading the data into a pandas dataframe will be helpful during the initial stages of development. Check out the Spark integration for a fully scalable solution. [Snowflake Connector for Spark](https://docs.snowflake.net/manuals/user-guide/spark-connector.html)


In [None]:
# Query Snowflake Data
cs=ctx.cursor()
allrows=cs.execute("""select Cust_ID,STATE,ACCOUNT_LENGTH,AREA_CODE,PHONE,INTL_PLAN,VMAIL_PLAN,VMAIL_MESSAGE,
                   DAY_MINS,DAY_CALLS,DAY_CHARGE,EVE_MINS,EVE_CALLS,EVE_CHARGE,NIGHT_MINS,NIGHT_CALLS,
                   NIGHT_CHARGE,INTL_MINS,INTL_CALLS,INTL_CHARGE,CUSTSERV_CALLS,
                   CHURN from CUSTOMER_CHURN """).fetchall()

churn = pd.DataFrame(allrows)
churn.columns=['Cust_id','State','Account Length','Area Code','Phone','Intl Plan', 'VMail Plan', 'VMail Message','Day Mins',
            'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls','Night Charge',
            'Intl Mins','Intl Calls','Intl Charge','CustServ Calls', 'Churn']

pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 10)         # Keep the output on one page
churn

By modern standards, it’s a relatively small dataset, with only 3,333 records, where each record uses 21 attributes to describe the profile of a customer of an unknown US mobile operator. The attributes are:

- `State`: the US state in which the customer resides, indicated by a two-letter abbreviation; for example, OH or NJ
- `Account Length`: the number of days that this account has been active
- `Area Code`: the three-digit area code of the corresponding customer’s phone number
- `Phone`: the remaining seven-digit phone number
- `Int’l Plan`: whether the customer has an international calling plan: yes/no
- `VMail Plan`: whether the customer has a voice mail feature: yes/no
- `VMail Message`: presumably the average number of voice mail messages per month
- `Day Mins`: the total number of calling minutes used during the day
- `Day Calls`: the total number of calls placed during the day
- `Day Charge`: the billed cost of daytime calls
- `Eve Mins, Eve Calls, Eve Charge`: the billed cost for calls placed during the evening
- `Night Mins`, `Night Calls`, `Night Charge`: the billed cost for calls placed during nighttime
- `Intl Mins`, `Intl Calls`, `Intl Charge`: the billed cost for international calls
- `CustServ Calls`: the number of calls placed to Customer Service
- `Churn`: whether the customer left the service: true/false

The last attribute, `Churn`, is known as the target attribute–the attribute that we want the ML model to predict.  Because the target attribute is binary, our model will be performing binary prediction, also known as binary classification.

Let's begin exploring the data:

In [None]:
# Frequency tables for each categorical feature
for column in churn.select_dtypes(include=['object']).columns:
    display(pd.crosstab(index=churn[column], columns='% observations', normalize='columns'))

# Histograms for each numeric features
display(churn.describe())
%matplotlib inline
hist = churn.hist(bins=30, sharey=True, figsize=(10, 10))

We can see immediately that:
- `State` appears to be quite evenly distributed
- `Phone` takes on too many unique values to be of any practical use.  It's possible parsing out the prefix could have some value, but without more context on how these are allocated, we should avoid using it.
- Only 14% of customers churned, so there is some class imabalance, but nothing extreme.
- Most of the numeric features are surprisingly nicely distributed, with many showing bell-like gaussianity.  `VMail Message` being a notable exception (and `Area Code` showing up as a feature we should convert to non-numeric).

In [None]:
churn = churn.drop('Phone', axis=1)
churn['Area Code'] = churn['Area Code'].astype(object)

Next let's look at the relationship between each of the features and our target variable.

In [None]:
for column in churn.select_dtypes(include=['object']).columns:
    if column != 'Churn':
        display(pd.crosstab(index=churn[column], columns=churn['Churn'], normalize='columns'))

for column in churn.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = churn[[column, 'Churn']].hist(by='Churn', bins=30)
    plt.show()

Interestingly we see that churners appear:
- Fairly evenly distributed geographically
- More likely to have an international plan
- Less likely to have a voicemail plan
- To exhibit some bimodality in daily minutes (either higher or lower than the average for non-churners)
- To have a larger number of customer service calls (which makes sense as we'd expect customers who experience lots of problems may be more likely to churn)

In addition, we see that churners take on very similar distributions for features like `Day Mins` and `Day Charge`.  That's not surprising as we'd expect minutes spent talking to correlate with charges.  Let's dig deeper into the relationships between our features.

In [None]:
display(churn.corr())
pd.plotting.scatter_matrix(churn, figsize=(18, 18))
plt.show()

We see several features that essentially have 100% correlation with one another.  Including these feature pairs in some machine learning algorithms can create catastrophic problems, while in others it will only introduce minor redundancy and bias.  Let's remove one feature from each of the highly correlated pairs: Day Charge from the pair with Day Mins, Night Charge from the pair with Night Mins, Intl Charge from the pair with Intl Mins:

In [None]:
churn = churn.drop(['Day Charge', 'Eve Charge', 'Night Charge', 'Intl Charge'], axis=1)

Our data is now prep. Normally, we would experiment with potential algorithms and performing additional feature engineering tasks to craft features with potentially stronger signals.

We're going to shortcut this process of using one of our Tabular AutoML options: AutoGluon. AutoGluon is an toolkit created by AWS Labs. Information about the contributors and the research work is documented in this [paper](https://arxiv.org/pdf/2003.06505.pdf).

Amazon SageMaker provides easy ways to train and deploy MXNet/Gluon based models simply by bringing your own script. AutoGluon is also conveniently packaged as a Marketplace [product](https://aws.amazon.com/marketplace/pp/prodview-n4zf5pmjt7ism) for SageMaker, so we don't need to worry about the operational heavy-lifting involved in maintaining containers and the runtime environments. We're going to use this product today in our workshop.

SageMaker provides other Tabular AutoML options via [Autopilot](https://aws.amazon.com/sagemaker/autopilot/). There are trade-offs between options. For this small dataset, AutoGluon runs much faster and produces great results. Autopilot runs distributed and the overhead is prohibitive for the purpose of this workshop. However, the serverless  and native distributed setup is more practical for larger datasets, and is easier to manage. 

Tabular AutoML automates the experimentation process of automating data analysis, feature engineering, exploring candidate algorithms and pipelines, hyperparameter tuning and ensemble generation. This potentially enables non-ML-experts to produce high-quality ML models.

## Finalize Data Prep

And now let's split the data into training and test sets.  This will help prevent overfitting, and allow us to test the models accuracy on unseen data. 

Traditionally, you would create a third validation set, which would be used during training. The validation set would be used in each training epoch to evaluate progress and help monitor overfitting issues. AutoGluon creates the validation set for us automatically.

In [None]:
to_split_data = churn.drop(['Cust_id'], axis=1)
train_data, test_data = np.split(to_split_data.sample(frac=1, random_state=1729), [int(0.9 * len(to_split_data))])
train_data.to_csv('train.csv', header=True, index=False)

In [None]:
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)
display(train_data)

SageMaker training ingests data from S3, so we'll need to upload these files into our bucket.

In [None]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')

---
## Train

Moving onto training, first we'll need to subscribe to the AWS Market AutoGluon product, so that it's accessible from our AWS account.

1. Follow [this URL](https://aws.amazon.com/marketplace/pp/Amazon-Web-Services-AutoGluon-Tabular/prodview-n4zf5pmjt7ism) to the AutoGluon-Tabular product page.
2. Select the orange "Continue to Subscribe" button.
3. Run the cell below to acquire the AWS resource ID (ARN) for your subscribed algorithm.

In [None]:
AUTOGLUON_PRODUCT = "autogluon-tabular-v2-a11d018f6028f8192e60553704ee3d97"
def get_algorithm_arn(region, algo_name):
    acct_mapping = {
        "ap-northeast-1" : "977537786026",
        "ap-northeast-2" : "745090734665",
        "ap-southeast-1" : "192199979996",
        "ap-southeast-2" : "666831318237",
        "us-east-1"      : "865070037744",
        "eu-central-1"   : "446921602837",
        "ap-south-1"     : "077584701553",
        "sa-east-1"      : "270155090741",
        "ca-central-1"   : "470592106596",
        "eu-west-1"      : "985815980388",
        "eu-west-2"      : "856760150666",
        "eu-west-3"      : "843114510376",
        "eu-north-1"     : "136758871317",
        "us-west-1"      : "382657785993",
        "us-east-2"      : "057799348421",
        "us-west-2"      : "594846645681"
    }
        
    return "arn:aws:sagemaker:{}:{}:algorithm/{}".format(region, acct_mapping[region], algo_name)
    
algorithm_arn = get_algorithm_arn(boto3.Session().region_name, AUTOGLUON_PRODUCT)
print("The Tabular AutoGluon ARN in your region is {}.".format(algorithm_arn))

Next, we configure our algorithm for training. Here are some of the key user inputs. 

1. **Hyperparamters**: AutoML algorithms like AutoGluon are designed to automate hyperparameter tuning using hyperparamter search algorithms like Bayesian Optimization. Thus, setting hyperparameters are optional. However, you can override the defaults. We'll use the default configurations in this lab, so we only need to identify the name of the target label column. The other configurations are commented out and serve as examples.

2. **Infrastructure**: We're using SageMaker's remote training service, so we need to specify the infrastructure to allocate. Since we're using a Marketplace product, we need to be aware of the subset of supported instances. 

3. **Data**: lastly, we need to identify the location of our training data.

In [None]:
hyperparameters = {
    #"hyperparameters": {
    #    "NN":{"num_epochs": "1"}
    #},
    #"auto_stack": "True",
    "label": "Churn"
}

compatible_training_instance_type='ml.m5.4xlarge' 
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')

Now, we create an Estimator object that represents our training job: security settings, infrastructure, data, and algorithms.

Executing the fit() method results in an API call to the SageMaker service to build out your training cluster and exectute the job.

In [None]:
autogluon = AlgorithmEstimator(algorithm_arn=algorithm_arn, 
                                  role=role, 
                                  train_instance_count=1, 
                                  train_instance_type=compatible_training_instance_type, 
                                  sagemaker_session=sess, 
                                  base_job_name='autogluon',
                                  hyperparameters=hyperparameters,
                                  train_volume_size=100) 

autogluon.fit({'training': s3_input_train})

## Deploy

The training job is complete. We now have a trained model that has been automatically optimized for us by AutoGluon. Our model (artifacts) now lie in S3. We can deploy and serve this model for real-time inference support by using SageMaker Hosting Services.

We simply configure and run the deploy() command to automate the process of creating a fully-managed model endpoint.

In [None]:
compatible_inference_instance_type='ml.m5.4xlarge'
predictor = autogluon.deploy(initial_instance_count=1, 
                        instance_type=compatible_inference_instance_type, 
                        content_type='text/csv', 
                        serializer=csv_serializer, 
                        deserializer=StringDeserializer())

## Visualize and Test

Once your model endpoint is up and running, we can make secure API calls to our model to obtain real-time predictions. 

Let's spot check our model with the test dataset that we previously generated. The "predictor" object represents our remote model endpoint. We can call predict() and send our features over to obtain predictions. 

Finally, we can evaluate our predictons against our ground truth labels by using a confusion matrix visualization.

In [None]:
label = 'Churn'
y_test = test_data[label].reset_index(drop=True)
X_test = test_data.drop(columns=[label])

y_pred = predictor.predict(X_test.to_csv(index=False)).splitlines()

cm = confusion_matrix(y_test, pd.DataFrame(y_pred))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=["True","False"])

disp = disp.plot(values_format='')
plt.show()

## Integrate SageMaker Hosted Endpoint with Snowflake

Our hosted model looks good. Let's integrate this model with Snowflake so that it can be used for predictive analytics.

We'll do this by using Snowflake's [External Functions](https://docs.snowflake.com/en/sql-reference/external-functions-introduction.html), which allows us to use SQL to query real-time predictions from our ML models.

----

## [Optional] Batch Inference

Sometimes it is desireable to pre-calculate and batch our predictions instead of using External Functions. This requires more heavy-lifting as you're responding for building and maintaining scoring pipelines. However, it's easier to scale an offline batching process efficiently.

We're going to use a SageMaker feature call Batch Transform to facilitate this. In a real-world deployment, you will probably leverage Batch Transform along side an ETL tool with a workflow manager to orchestrate pipeline stages.

SageMaker Batch Transform is designed to run asynchronously and ingest input data from S3. This differs from SageMaker's real-time inference endpoints, which receive input data from synchronous HTTP requests.

For large scale deployments, the dataset will be retrieved from Snowflake using SQL and an External Stage to S3.

You might use Batch Transform in place of a Hosted Endpoint for the following reasons:

 - Batch Transform is better optimized for throughput in comparison with real-time inference endpoints. Thus, Batch Transform is ideal for processing large volumes of data for analytics.
 - Offline asynchronous processing is acceptable for most analytics use cases.
 - Batch Transform is more cost efficient when real-time inference isn't necessary. You only need to pay for resources used during batch processing. There is no need to pay for ongoing resources like a hosted endpoint for real-time inference.
 
You might use Hosted Endpoints with External Functions for reasons such as:

- You don't want to bother with building and managing a bunch of scoring pipelines.
- You have use cases that benefit from having fresher predictions.


-----
Let's configure the data location for our Batch Transform job.

In [None]:
batch_input = churn.iloc[:,:-1]
batch_input.to_csv('batch.csv', header=False, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'batch/in/batch.csv')).upload_file('batch.csv')

s3uri_batch_input ='s3://{}/{}/batch/in'.format(bucket, prefix)
print('Batch Transform input S3 uri: {}'.format(s3uri_batch_input))

s3uri_batch_output= 's3://{}/{}/batch/out'.format(bucket, prefix)
print('Batch Transform output S3 uri: {}'.format(s3uri_batch_output))

Now we create a transformer() object to describe our batch job. The transform() call results in an API call to the SageMaker service to create and run the batch process.

In [None]:
from sagemaker.transformer import Transformer
BATCH_INSTANCE_TYPE = 'ml.c5.xlarge'

transformer = autogluon.transformer(instance_count=1,
                                         strategy='SingleRecord',
                                         assemble_with='Line',
                                         instance_type= BATCH_INSTANCE_TYPE,
                                         accept = 'text/csv',
                                         output_path=s3uri_batch_output)
    
transformer.transform(s3uri_batch_input,
                      split_type= 'Line',
                      content_type= 'text/csv',   
                      input_filter = "$[1:]",
                      join_source = "Input",
                      output_filter = "$[0,-1]")

Batch transform jobs run asynchronously, and are non-blocking by default. Run the command below to block until the batch job completes.

In [None]:
transformer.wait()

There are many ways to compare the performance of a machine learning model, but let's start by simply by comparing actual to predicted values.  In this case, we're simply predicting whether the customer churned (`1`) or not (`0`), which produces a simple confusion matrix.

In [None]:
batched_churn_scores = pd.read_csv(s3uri_batch_output+'/batch.csv.out', usecols=[0,1], names=['id','scores'])
#batched_churn_scores['scores'] = (batched_churn_scores['scores'] == "False.").astype(int)
display(batched_churn_scores)

In [None]:
batched_churn_scores = pd.read_csv(s3uri_batch_output+'/batch.csv.out', usecols=[0,1], names=['id','scores'])
batched_churn_scores['scores'] = (batched_churn_scores['scores'] == "True.").astype(int)
gt_df = pd.DataFrame(churn['Churn']).reset_index(drop=True)

results_df= pd.concat([gt_df,batched_churn_scores],axis=1)
pd.crosstab(index=results_df['Churn'], columns=np.round(results_df['scores']), rownames=['actual'], colnames=['predictions'])

## Upload Churn Score to Snowflake

To be able to allow multiple business users and dashboards simple access to the churn scores we will upload it to Snowflake by using a Snowflake internal stage. 

In [None]:
results_df.to_csv('results.csv', header=False, index=False)
cs.execute("PUT file://results.csv @ml_results")