# Customer Churn Analysis

#### We are tasked to build an end to end machine learning pipeline using snowpark for customer churn prediction in a telecom company.

In this notebook we will load, clean and transform the raw parquet dataset

Update the config.py file before moving on to the next cell



In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import *
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff
from sklearn import linear_model
import matplotlib.pyplot as plt

%matplotlib inline
import datetime as dt
import numpy as np
import pandas as pd
import seaborn as sns
import json
import uuid

from snowflake.snowpark import version
print(version.VERSION)

import warnings
warnings.filterwarnings("ignore")

(1, 11, 1)


In [2]:
# connect to Snowflake
with open("secrets.json", "r") as f:
    snowflake_conn_prop = json.load(f)  
session = Session.builder.configs(snowflake_conn_prop).create()

Let's configure our Snowpark Session and initialize the database, warehouse, and schema that we will use for the remainder of the quickstart.

In [3]:

#session.close()
#session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create or replace warehouse {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())


[Row(CURRENT_WAREHOUSE()='SNOWPARK_WH', CURRENT_DATABASE()='CHURN_DB', CURRENT_SCHEMA()='CHURN_SCHEMA')]


## Infer file schema & Load Data into snowflake

In [4]:
#convert csv file to parquet file at the same location
df = pd.read_csv('telco_data_set.csv', low_memory=False)
df.to_parquet('telco_data_set.parquet', engine="fastparquet")

In [5]:
#filename = "raw_telco_data.parquet"
filename = "telco_data_set.parquet"
stagename = "rawtelcodata"
rawtable = "RAW_TELCO_PARQUET_DATA"

In [6]:
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
session.file.put(filename,stagename)

[PutResult(source='telco_data_set.parquet', target='telco_data_set.parquet', source_size=4012879, target_size=4012880, source_compression='PARQUET', target_compression='PARQUET', status='UPLOADED', message='')]

In [7]:
session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;").collect()

session.sql(f"CREATE OR REPLACE \
            TABLE {rawtable} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()

[Row(status='Table RAW_TELCO_PARQUET_DATA successfully created.')]

## For incremental load 

you might want to just delete all rows and load new data to this table

In [8]:
dfClear = session.table(rawtable).delete()

In [9]:
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable,FORCE= True)

[Row(file='rawtelcodata/telco_data_set.parquet', status='LOADED', rows_parsed=100000, rows_loaded=100000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

In [10]:
dfR = session.table(rawtable)
#dfR.toPandas()
# Drop the column with the empty header
dfR = dfR.drop('Unnamed: 0')
# Overwrite the original table with the updated dataframe
dfR.write.mode("overwrite").save_as_table(rawtable)
dfR.show(2)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"GENDER"  |"PARTNER"  |"DEPENDENTS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"ONLINESECURITY"  |"SENIORCITIZEN"  |"MONTHLYCHARGES"  |"STREAMINGMOVIES"  |"PAYMENTMETHOD"   |"TENUREMONTHS"  |"PAPERLESSBILLING"  |"TECHSUPPORT"  |"INTERNETSERVICE"  |"STREAMINGTV"  |"CONTRACT"      |"ONLINEBACKUP"  |"DEVICEPROTECTION"  |"TOTALCHARGES"  |"TENUREMONTHSBIN"  |"MONTHLYCHARGESBIN"  |"TOTALCHARGESBIN"  |"CHURNVALUE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
import random
import string

pd_df = dfR.to_pandas()

# Define the number of unique IDs you want to generate
num_ids = pd_df.count()
print(num_ids)
# Generate and add unique IDs to the DataFrame
# Generate a list of random IDs
#random_ids = [str(uuid.uuid4().hex) for _ in range(num_ids)]
# Create a Pandas DataFrame
#pd_df = pd.DataFrame({'ID': random_ids})
# Create a DataFrame from the list of unique IDs
#id_df = session.createDataFrame(pd_df)
#id_df.show(10)

# Generate a list of random IDs
#random_ids = [str(uuid.uuid4().hex) for _ in range(num_ids)]
#print(random_ids[1])


GENDER               100000
PARTNER              100000
DEPENDENTS           100000
PHONESERVICE         100000
MULTIPLELINES        100000
ONLINESECURITY       100000
SENIORCITIZEN        100000
MONTHLYCHARGES       100000
STREAMINGMOVIES      100000
PAYMENTMETHOD        100000
TENUREMONTHS         100000
PAPERLESSBILLING     100000
TECHSUPPORT          100000
INTERNETSERVICE      100000
STREAMINGTV          100000
CONTRACT             100000
ONLINEBACKUP         100000
DEVICEPROTECTION     100000
TOTALCHARGES         100000
TENUREMONTHSBIN      100000
MONTHLYCHARGESBIN    100000
TOTALCHARGESBIN      100000
CHURNVALUE           100000
dtype: int64


In [12]:
# Assuming you have an existing Snowpark DataFrame 'df'
# Define the number of rows in the existing DataFrame
num_rows = dfR.count()

# Generate random IDs
random_ids = [''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) for _ in range(num_rows)]

# Add the 'ID' column with random values to the Pandas DataFrame
pd_df['CUSTOMERID'] = random_ids

# Convert Pandas DataFrame back to Snowpark DataFrame
df_with_id = session.createDataFrame(pd_df)

# Display the DataFrame with the added 'ID' column
#df_with_id.show(2)

In [13]:
df_with_id.show(2)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"GENDER"  |"PARTNER"  |"DEPENDENTS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"ONLINESECURITY"  |"SENIORCITIZEN"  |"MONTHLYCHARGES"  |"STREAMINGMOVIES"  |"PAYMENTMETHOD"   |"TENUREMONTHS"  |"PAPERLESSBILLING"  |"TECHSUPPORT"  |"INTERNETSERVICE"  |"STREAMINGTV"  |"CONTRACT"      |"ONLINEBACKUP"  |"DEVICEPROTECTION"  |"TOTALCHARGES"  |"TENUREMONTHSBIN"  |"MONTHLYCHARGESBIN"  |"TOTALCHARGESBIN"  |"CHURNVALUE"  |"CUSTOMERID"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
rawtelcotable = "RAW_TELCO_ID_TABLE"

# Overwrite the original table with the updated dataframe
df_with_id.write.mode("overwrite").save_as_table(rawtelcotable)
# Retrieve the updated table and display its contents
dfR_ID = session.table(rawtelcotable)

In [15]:
dfR_ID.show(2)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"GENDER"  |"PARTNER"  |"DEPENDENTS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"ONLINESECURITY"  |"SENIORCITIZEN"  |"MONTHLYCHARGES"  |"STREAMINGMOVIES"  |"PAYMENTMETHOD"   |"TENUREMONTHS"  |"PAPERLESSBILLING"  |"TECHSUPPORT"  |"INTERNETSERVICE"  |"STREAMINGTV"  |"CONTRACT"      |"ONLINEBACKUP"  |"DEVICEPROTECTION"  |"TOTALCHARGES"  |"TENUREMONTHSBIN"  |"MONTHLYCHARGESBIN"  |"TOTALCHARGESBIN"  |"CHURNVALUE"  |"CUSTOMERID"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------

In [16]:
# Check the number of columns in each DataFrame
l_dfR_ID = len(dfR_ID.columns)
print(l_dfR_ID)
# Check the number of columns in each DataFrame
r_dfR_ID = dfR_ID.count()
print(r_dfR_ID)

24
100000


# Snowpark Transformations

The Snowpark API provides programming language constructs for building SQL statements. It's a new developer experience which enables us to build code in :-

<b><li>  Language of our choice </li></b>
<b><li> Tool of our choice and </li></b>
<b><li> Lazy execution to prevent multiple network hops to server </li></b>

Once the customer data is available in the RAW schema, we can use snowpark to create dimensions and fact tables. We will use the RAW_PARQUET table to create following tables -
    
<li> DEMOGRAPHICS </li>
<li> LOCATION </li>
<li> STATUS </li>
<li> SERVICES </li>

We will also transform and clean the data using Snowpark dataframe API

In [17]:
#dfR = session.table(rawtable)

In [18]:
#dfR.show(5)

In [19]:
dfDemographics = dfR_ID.select(col("CUSTOMERID"),
                             #col("COUNT").alias("COUNT"),
                             translate(col("GENDER"),lit("NULL"),lit("Male")).alias("GENDER"),
                             col("SENIORCITIZEN").alias("SENIORCITIZEN"),
                             col("PARTNER"),
                             col("DEPENDENTS")          
                            )


dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
dfDemographics.show()


------------------------------------------------------------------------
|"CUSTOMERID"  |"GENDER"  |"SENIORCITIZEN"  |"PARTNER"  |"DEPENDENTS"  |
------------------------------------------------------------------------
|WwrUeyguhu    |Female    |No               |No         |Yes           |
|DNuFFvxO8O    |Female    |No               |No         |Yes           |
|xeiCbPqYpb    |Male      |Yes              |No         |Yes           |
|NJBFBr8oKJ    |Male      |No               |No         |Yes           |
|xu9UDKawli    |Female    |No               |No         |Yes           |
|6DTmlnI2x1    |Male      |No               |No         |Yes           |
|2YUHLqFUcr    |Male      |No               |No         |No            |
|07IwBdHiFH    |Male      |Yes              |No         |No            |
|mX13Rfc1Vl    |Male      |No               |No         |No            |
|w2PkmlZbto    |Male      |Yes              |Yes        |No            |
---------------------------------------------------

In [20]:
#dfLocation = dfR.select( col("COUNTRY").name("COUNTRY"),
                         #col("STATE").name("STATE"),
                         #col("CITY").name("CITY"),
                         #translate(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
                         #col("LAT LONG").name("LATLONG"),
                         #col("LATITUDE").name("LATITUDE"),
                         #col("LONGITUDE").name("LONGITUDE")       
                        #)

#dfLocation.write.mode('overwrite').saveAsTable('LOCATION')
#dfLocation.show()


#### you can run transformation on data using similar dataframe API constructs, for example -

In [21]:
dfServices = dfR_ID.select(col("CUSTOMERID"),
                       col("TENUREMONTHS").name("TENUREMONTHS"),
                       iff(is_null(col("PHONESERVICE")),lit('N'),col("PHONESERVICE")).name("PHONESERVICE"),
                       iff(is_null(col("MULTIPLELINES")),lit("No"),col("MULTIPLELINES")).name("MULTIPLELINES"),
                       iff(is_null(col("INTERNETSERVICE")),lit("No"),col("INTERNETSERVICE")).name("INTERNETSERVICE"),
                       iff(is_null(col("ONLINESECURITY")),lit("No"),col("ONLINESECURITY")).name("ONLINESECURITY"),
                       iff(is_null(col("ONLINEBACKUP")),lit("No"),col("ONLINEBACKUP")).name("ONLINEBACKUP"),
                       iff(is_null(col("DEVICEPROTECTION")),lit("No"),col("DEVICEPROTECTION")).name("DEVICEPROTECTION"),
                       iff(is_null(col("TECHSUPPORT")),lit('N'),col("TECHSUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("STREAMINGTV")),lit("No"),col("STREAMINGTV")).name("STREAMINGTV"),
                       iff(is_null(col("STREAMINGMOVIES")),lit("No"),col("STREAMINGMOVIES")).name("STREAMINGMOVIES"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESSBILLING")),lit('Y'),col("PAPERLESSBILLING")).name("PAPERLESSBILLING"),
                       iff(is_null(col("PAYMENTMETHOD")),lit("Mailed check"),col("PAYMENTMETHOD")).name("PAYMENTMETHOD"),
                       col("MONTHLYCHARGES").name("MONTHLYCHARGES"),
                       col("TOTALCHARGES").name("TOTALCHARGES"),
                       col("TENUREMONTHSBIN").name("TENUREMONTHSBIN"),
                       col("MONTHLYCHARGESBIN").name("MONTHLYCHARGESBIN"),
                       col("TOTALCHARGESBIN").name("TOTALCHARGESBIN"),
                       col("CHURNVALUE").name("CHURNVALUE")        

                      )

dfServices.write.mode('overwrite').saveAsTable('SERVICES')
dfServices.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"TENUREMONTHS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"INTERNETSERVICE"  |"ONLINESECURITY"     |"ONLINEBACKUP"       |"DEVICEPROTECTION"   |"TECHSUPPORT"        |"STREAMINGTV"        |"STREAMINGMOVIES"    |"CONTRACT"      |"PAPERLESSBILLING"  |"PAYMENTMETHOD"   |"MONTHLYCHARGES"  |"TOTALCHARGES"  |"TENUREMONTHSBIN"  |"MONTHLYCHARGESBIN"  |"TOTALCHARGESBIN"  |"CHURNVALUE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
#dfStatus = dfR.select(col("CUSTOMERID"),
                    #iff(is_null(col("CHURN LABEL")),lit('N'),col("CHURN LABEL")).name("CHURNLABEL"),
                    #col("CHURN VALUE").name("CHURNVALUE"),
                    #col("CHURN SCORE").name("CHURNSCORE"),
                    #col("CLTV").name("CLTV"),
                    #iff(is_null(col("CHURN REASON")),lit("do not know"),col("CHURN REASON")).name("CHURNREASON")          
                    #)

#dfStatus.write.mode('overwrite').saveAsTable('STATUS')
#dfStatus.show()


# Lets check the data using an example query

This shows one of many uses of snowpark. You can build and query dataframes lazily.

In [23]:
# Lets run a query for quick sanity check
# This Query will show us the total revenue by city and contract term

#dfLoc = session.table("LOCATION")
#dfServ = session.table("SERVICES")

#dfJoin = dfLoc.join(dfServ,dfLoc.col("CUSTOMERID") == dfServ.col("CUSTOMERID"))

#dfResult = dfJoin.select(col("CITY"),
                         #col("CONTRACT"),
                         #col("TOTALCHARGES")).groupBy(col("CITY"),col("CONTRACT")).sum(col("TOTALCHARGES"))

#dfResult.show()

### Let's create a view for data science team to begin data analysis

To do so, join up the `DEMOGRAPHICS` and `SERVICES` tables based on `CUSTOMERID`

In [24]:
dfD = session.table('DEMOGRAPHICS')
dfS = session.table('SERVICES')
dfJ = dfD.join(dfS, using_columns='CUSTOMERID', join_type = 'left')
dfJ.select(col('GENDER'),
              col('SENIORCITIZEN'),
              col('PARTNER'),
              col('DEPENDENTS'),
              col('MULTIPLELINES'),
              col('INTERNETSERVICE'),
              col('ONLINESECURITY'),
              col('ONLINEBACKUP'),
              col('DEVICEPROTECTION'),
              col('TECHSUPPORT'),
              col('STREAMINGTV'),
              col('STREAMINGMOVIES'),
              col('CONTRACT'),
              col('PAPERLESSBILLING'),
              col('PAYMENTMETHOD'),
              col('TENUREMONTHS'),
              col('MONTHLYCHARGES'),
              col('TOTALCHARGES'),
              col('TENUREMONTHSBIN'),
              col('MONTHLYCHARGESBIN'),
              col('TOTALCHARGESBIN'),
              col('CHURNVALUE'))
dfJ.create_or_replace_view('TRAIN_DATASET_BIN')

[Row(status='View TRAIN_DATASET_BIN successfully created.')]

In [25]:
%%time

raw = session.table('TRAIN_DATASET_BIN').sample(n = 20)
data = raw.toPandas()

CPU times: user 15.5 ms, sys: 3.95 ms, total: 19.4 ms
Wall time: 314 ms


# Off to ~02 notebook for exploratory data analysis

In [26]:
pd.pandas.set_option('display.max_columns', None)
data.head()

Unnamed: 0,CUSTOMERID,GENDER,SENIORCITIZEN,PARTNER,DEPENDENTS,TENUREMONTHS,PHONESERVICE,MULTIPLELINES,INTERNETSERVICE,ONLINESECURITY,ONLINEBACKUP,DEVICEPROTECTION,TECHSUPPORT,STREAMINGTV,STREAMINGMOVIES,CONTRACT,PAPERLESSBILLING,PAYMENTMETHOD,MONTHLYCHARGES,TOTALCHARGES,TENUREMONTHSBIN,MONTHLYCHARGESBIN,TOTALCHARGESBIN,CHURNVALUE
0,t34zhyWFNv,Female,No,Yes,No,22,Yes,Yes,DSL,Yes,No,No,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,79.2,1742.75,Medium,Medium,Low,1
1,8wdPc6SyD8,Female,No,No,No,11,Yes,No,DSL,Yes,No,No,No,No,No,Month-to-month,No,Credit card (automatic),48.55,501.0,Low,Low,High,1
2,6h6ar7ZzBl,Female,Yes,Yes,No,56,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Bank transfer (automatic),100.65,5688.05,High,High,Low,1
3,Am1NwIOiFc,Female,No,No,No,1,Yes,Yes,Fiber optic,No,No,No,No,No,No,Month-to-month,No,Electronic check,76.0,76.0,Medium,Medium,Low,1
4,gKq74yLXiX,Female,No,Yes,No,71,Yes,Yes,DSL,Yes,Yes,Yes,Yes,Yes,Yes,Two year,No,Bank transfer (automatic),87.95,6365.35,Low,Medium,Low,0


In [27]:
session.close()