# Customer Churn Analysis

### Problem Statement: 
#### We are tasked to build an end to end machine learning pipeline using snowpark for customer churn prediction in a telecom company to identify users who are at high risk of churning
#### To accomplish this, we need to build a model that can learn how to identify such users, demonstrating with Snowflake/Snowpark to build a Classifier to help us with this task.

### In this notebook we will -
1. Load the raw parquet dataset
2. EDA - clean and transform to create a dataset for model training
3. Model Training and Deployment

### Prerequisites
1. Familiarity with basic Python and SQL
2. Familiarity with training ML models
3. Familiarity with data science notebooks
4. Snowflake Account

### What we'll Learn
1. How to import/export data between Client and Snowflake
2. How to conduct data cleaning and transformation using Snowpark
3. How to train a model with Snowpark ML model
4. How to visualize the predicted results from the model using packages like seaborn, matplotlib, plotly, streamlit
5. How to convert the python code into an interactive streamlit app and make predictions on production data

In [None]:
# Snowpark API libraries
from snowflake.snowpark import *
from snowflake.snowpark.types import *
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark import version
from snowflake.snowpark.functions import udf

# Snowpark ML libraries
from snowflake.ml.modeling.preprocessing import OneHotEncoder
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff

# Python libraries
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns
import joblib
import json
print(version.VERSION)

import warnings
warnings.filterwarnings("ignore")

###  Establishing a connection to the Snowflake database using Snowpark

In [None]:
# connect to Snowflake
with open("creds.json", "r") as f:
    snowflake_conn_prop = json.load(f)  
session = Session.builder.configs(snowflake_conn_prop).create()

### Let's configure our Snowpark Session and initialize - 
#### DATABASE, WAREHOUSE, and SCHEMA that we will use for the remainder of the notebook.

In [None]:

#session.close()
#session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create or replace warehouse {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())


### Infer file schema & Load Data into snowflake

In [None]:
folderpath = '/<path-to-the-folder>/customer-churn-prediction/data'

In [None]:
#convert csv file to parquet file at the same location
df = pd.read_csv(folderpath + 'telco_data_set.csv', low_memory=False)
df.to_parquet(folderpath + 'telco_data_set.parquet', engine="fastparquet")

In [None]:
filename = "telco_data_set.parquet"
stagename = "RAWTELCODATA"
tablename = "RAW_TELCO_PARQUET_DATA"

In [None]:
session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;").collect()

session.sql(f"CREATE OR REPLACE \
            TABLE {tablename} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()

### For incremental load 

In [None]:
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(tablename,FORCE= True)

In [None]:
df_data = session.table(tablename)
#dfR.toPandas()
# Drop the column with the empty header
df_data = df_data.drop('Unnamed: 0')
# Overwrite the original table with the updated dataframe
df_data.write.mode("overwrite").save_as_table(tablename)
df_data.show(5)

### Add a New Column 'CUSTOMERID' to the Snowpark Dataframe

In [None]:
# Assuming you have an existing Snowpark DataFrame 'df'
# Define the number of rows in the existing DataFrame
num_rows = df_data.count()
print(num_rows)

In [None]:
df1 = df_data
cols_df1 = df1.columns
cols_df1

In [None]:
# Add the new column to the existing DataFrame
df1 = df1.withColumn("CUSTOMERID", F.expr("CAST(FLOOR(ABS(RANDOM()) * 1000000 + 1) AS INT)"))

# Ensure 6 digits by using LPAD
df1 = df1.withColumn("CUSTOMERID", F.expr("LPAD(CAST(CUSTOMERID AS VARCHAR), 6, '0')"))

In [None]:
rawtelcotable = "RAW_TELCO_ID_TABLE"

# Overwrite the original table with the updated dataframe
df1.write.mode("overwrite").save_as_table(rawtelcotable)
# Retrieve the updated table and display its contents
df_data_ID = session.table(rawtelcotable)

### Categorical and Numerical Columns in the dataset

In [None]:
cat_cols = ['GENDER','SENIORCITIZEN','PARTNER','DEPENDENTS','PHONESERVICE','MULTIPLELINES',
            'INTERNETSERVICE','ONLINESECURITY','ONLINEBACKUP','DEVICEPROTECTION','TECHSUPPORT','STREAMINGTV','STREAMINGMOVIES',
            'CONTRACT','PAPERLESSBILLING','PAYMENTMETHOD','TENUREMONTHSBIN','MONTHLYCHARGESBIN','TOTALCHARGESBIN']
num_cols = ["MONTHLYCHARGES", "TOTALCHARGES"]

### Number of Records in the dataset

In [None]:
df_data_ID.count()

### Duplicates Removal

In [None]:
duplicates_df_data_ID = df_data_ID.group_by('CUSTOMERID').agg(F.count(('CUSTOMERID'))).filter(F.col('COUNT(CUSTOMERID)') > 1)
print('Number Duplicates:', duplicates_df_data_ID.count())

### Use the **drop_duplicates** to remove duplicated rows

In [None]:
df_data_ID = df_data_ID.drop_duplicates('CUSTOMERID')
df_data_ID.count()

### Simple Statistics
Obtaining simple statistics per column - why are some statistics missing? \
Can you already identify problems in our data? \
For example count always return the number of non null records.

In [None]:
df_data_ID.describe().show(5)

### Missing Value Imputation
The describe output show that if we have missing values in the dataset. \
We will use the fillna method to replace missing values in any columns, if they are many we can create a new category for it.

In [None]:
df_data_ID = df_data_ID.fillna(value='0.0', subset=['TOTALCHARGES'])
df_data_ID.describe().show()

### Finding constant variables
How many distinct values do we have per column?  
**Hint:** Constant values are probably irrelevant  
**Hint:** Variables with many different values can be problematic

In [None]:
unique_values = []
for column in df_data_ID.columns:
    unique_values.append([column, df_data_ID.select(column).distinct().count()])
pd.DataFrame(unique_values, columns=['COLUMN_NAME','NUM_UNIQUE_VALUES'])

### Feature Engineering

### Encoding: Prepare the data for model training by encoding categorical columns.

In [None]:
OHE = OneHotEncoder(
    input_cols=cat_cols,
    output_cols=cat_cols,
    drop_input_cols=True,
    drop="first",
    handle_unknown="ignore",
)

load_data_ohe = OHE.fit(df_data_ID).transform(df_data_ID)
load_data_ohe.show(5)

## Remove any spaces or '()' characters from the column names and convert to UPPER case

In [None]:
new_columns = [load_data_ohe[col].alias(col.replace(' (automatic)', '')) for col in load_data_ohe.columns]
load_data_ohe = load_data_ohe.select(*new_columns)

In [None]:
# Convert all columns to upper case using alias and replace spaces with underscores
new_columns = [load_data_ohe[col].alias(col.replace(' ', '_').upper()) for col in load_data_ohe.columns]
load_data_ohe = load_data_ohe.select(*new_columns)

# Show the updated DataFrame
load_data_ohe.show()

### Assign to another dataframe to create the final dataset

In [None]:
df_data_ID = load_data_ohe

### Validating columns in each DataFrame

In [None]:
# Check the number of columns in each DataFrame
l_dfR_ID = len(df_data_ID.columns)
print(l_dfR_ID)
# Check the number of columns in each DataFrame
r_dfR_ID = df_data_ID.count()
print(r_dfR_ID)

### The Snowpark API provides programming language constructs for building SQL statements. \
### It's a new developer experience which enables us to build code in :-

<b><li>  Language of our choice </li></b>
<b><li> Tool of our choice and </li></b>
<b><li> Lazy execution to prevent multiple network hops to server </li></b>


In [None]:
df_data_ID.show()

In [None]:
df_data_ID.columns

In [None]:
dfDemographics = df_data_ID.select(col("CUSTOMERID"),
                              translate(col("GENDER_MALE"),lit("NULL"),lit("Male")).alias("GENDER"),
                              col("SENIORCITIZEN_YES").alias("SENIORCITIZEN_YES"),
                              col("PARTNER_YES"),
                              col("DEPENDENTS_YES")          
                             )


dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
dfDemographics.show()


## We can run transformation on data using similar dataframe API constructs, for example -

In [None]:
session.sql("DROP TABLE IF EXISTS dfServices;").collect()

dfServices = df_data_ID.select(col("CUSTOMERID"),
                       col("PHONESERVICE_YES").name("PHONESERVICE_YES"),
                       col("MULTIPLELINES_NO_PHONE_SERVICE").name("MULTIPLELINES_NO_PHONE_SERVICE"),
                       col("MULTIPLELINES_YES").name("MULTIPLELINES_YES"),
                       col("INTERNETSERVICE_FIBER_OPTIC").name("INTERNETSERVICE_FIBER_OPTIC"),
                       col("INTERNETSERVICE_NO").name("INTERNETSERVICE_NO"),
                       col("ONLINESECURITY_NO_INTERNET_SERVICE").name("ONLINESECURITY_NO_INTERNET_SERVICE"),
                       col("ONLINESECURITY_YES").name("ONLINESECURITY_YES"),
                       col("ONLINEBACKUP_NO_INTERNET_SERVICE").name("ONLINEBACKUP_NO_INTERNET_SERVICE"),
                       col("ONLINEBACKUP_YES").name("ONLINEBACKUP_YES"),
                       col("DEVICEPROTECTION_NO_INTERNET_SERVICE").name("DEVICEPROTECTION_NO_INTERNET_SERVICE"),
                       col("DEVICEPROTECTION_YES").name("DEVICEPROTECTION_YES"),
                       col("TECHSUPPORT_NO_INTERNET_SERVICE").name("TECHSUPPORT_NO_INTERNET_SERVICE"),
                       col("TECHSUPPORT_YES").name("TECHSUPPORT_YES"),
                       col("STREAMINGTV_NO_INTERNET_SERVICE").name("STREAMINGTV_NO_INTERNET_SERVICE"),
                       col("STREAMINGTV_YES").name("STREAMINGTV_YES"),
                       col("STREAMINGMOVIES_NO_INTERNET_SERVICE").name("STREAMINGMOVIES_NO_INTERNET_SERVICE"),
                       col("STREAMINGMOVIES_YES").name("STREAMINGMOVIES_YES"),
                       col("CONTRACT_ONE_YEAR").name("CONTRACT_ONE_YEAR"),
                       col("CONTRACT_TWO_YEAR").name("CONTRACT_TWO_YEAR"),
                       col("PAPERLESSBILLING_YES").name("PAPERLESSBILLING_YES"),
                       col("PAYMENTMETHOD_CREDIT_CARD").name("PAYMENTMETHOD_CREDIT_CARD"),
                       col("PAYMENTMETHOD_ELECTRONIC_CHECK").name("PAYMENTMETHOD_ELECTRONIC_CHECK"),
                       col("PAYMENTMETHOD_MAILED_CHECK").name("PAYMENTMETHOD_MAILED_CHECK"),
                       col("MONTHLYCHARGESBIN_LOW").name("MONTHLYCHARGESBIN_LOW"),
                       col("MONTHLYCHARGESBIN_MEDIUM").name("MONTHLYCHARGESBIN_MEDIUM"),
                       col("TOTALCHARGESBIN_LOW").name("TOTALCHARGESBIN_LOW"),
                       col("TOTALCHARGESBIN_MEDIUM").name("TOTALCHARGESBIN_MEDIUM"),
                       col("TENUREMONTHS").name("TENUREMONTHS"),
                       col("MONTHLYCHARGES").name("MONTHLYCHARGES"),
                       col("TOTALCHARGES").name("TOTALCHARGES"),
                       col("CHURNVALUE").name("CHURNVALUE") 
)       

dfServices.write.mode('overwrite').saveAsTable('TRAIN_DATASET_BIN')
dfServices.show(5)

## Create the TABLE in SNOWFLAKE - TRAINING DATASET

In [None]:
session.sql("DROP TABLE IF EXISTS TRAIN_CHURN_DATASET_BIN;").collect()

dfServices.write.mode("overwrite").save_as_table("TRAIN_CHURN_DATASET_BIN")

In [None]:
dfServices.show(10)

## Off to ~02 notebook for exploratory data analysis

In [None]:
session.close()