# Customer Churn Analysis

We are tasked to build an end to end machine learning pipeline using snowpark for customer churn prediction in a telecom company.
In this notebook we will load, clean and transform the raw parquet dataset

In [2]:
pip install seaborn


Collecting seaborn
  Downloading seaborn-0.12.2-py3-none-any.whl (293 kB)
[K     |████████████████████████████████| 293 kB 4.6 MB/s eta 0:00:01
Installing collected packages: seaborn
Successfully installed seaborn-0.12.2
Note: you may need to restart the kernel to use updated packages.


In [1]:
# Import required libraries
from snowflake.snowpark.session import Session
#Returns a unique system identifier for the Snowflake session corresponding to the present connection
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

import sklearn

from sklearn import linear_model


import matplotlib.pyplot as plt

%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

#Snowflake connection info is saved in config.py
from snowflake_conn_prop_Copy1 import snowflake_conn_prop


# lets import some tranformations functions
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff


In [2]:
from snowflake.snowpark import version
print(version.VERSION)

(1, 2, 0)


Let's configure our Snowpark Session and initialize the database, warehouse, and schema that we will be using

In [3]:
#session.close()
#session = Session.builder.configs(snowflake_conn_prop).create()
session = Session.builder.configs(snowflake_conn_prop).create()


In [4]:
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create or replace warehouse {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='SP_QS_WH', CURRENT_DATABASE()='SNOWPARK_QUICKSTART', CURRENT_SCHEMA()='TELCO')]


# Infer file schema & Load Data into snowflake

In [5]:
filename = "raw_telco_data.parquet"
stagename = "rawdata"
rawtable = "RAW_PARQUET_DATA"

In [6]:
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
session.file.put(filename,stagename)

[PutResult(source='raw_telco_data.parquet', target='raw_telco_data.parquet', source_size=3037540, target_size=3037552, source_compression='PARQUET', target_compression='PARQUET', status='UPLOADED', message='')]

In [7]:
session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;").collect()

session.sql(f"CREATE OR REPLACE \
            TABLE {rawtable} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()

[Row(status='Table RAW_PARQUET_DATA successfully created.')]

# For incremental load

you might want to just delete all rows and load new data to this table

In [8]:
dfClear = session.table(rawtable).delete()

In [9]:
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable,FORCE= True)

[Row(file='rawdata/raw_telco_data.parquet', status='LOADED', rows_parsed=100000, rows_loaded=100000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

In [10]:
dfR = session.table(rawtable).sample(n=5)
dfR.toPandas()

Unnamed: 0,CUSTOMERID,GENDER,PARTNER,DEPENDENTS,COUNTRY,CITY,STATE,ZIP CODE,LONGITUDE,LATITUDE,...,STREAMING TV,CONTRACT,CHURN SCORE,ONLINE BACKUP,DEVICE PROTECTION,TOTAL CHARGES,CLTV,CHURN VALUE,CHURN LABEL,CHURN REASON
0,4318-jsWlq,Female,True,False,United States,San Pedro,California,90731,-118.284363,33.736387,...,Yes,One year,0,Yes,No,4594.65,3517,0.0,False,do not know
1,3891-OotYM,Female,False,False,United States,Pleasant Grove,California,95668,-121.498102,38.833554,...,Yes,Month-to-month,1,Yes,Yes,442.85,2268,1.0,True,Competitor offered more data
2,2581-21SEu,Female,False,False,United States,Redwood City,California,94063,-122.196318,37.499411,...,Yes,Two year,0,No,Yes,3921.1,6251,0.0,False,do not know
3,4314-DhnS2,Male,False,False,United States,Cool,California,95614,-120.973865,38.880622,...,Yes,Month-to-month,1,No,Yes,197.7,3790,1.0,True,Service dissatisfaction
4,4635-xuiMI,Male,True,False,United States,Los Angeles,California,90044,-118.292061,33.952714,...,Yes,Two year,0,Yes,Yes,8375.05,6274,0.0,False,do not know


# Snowpark Transformations

The Snowpark API provides programming language constructs for building SQL statements. It's a new developer experience which enables us to build code in :-


- **Language of our choice**
- **Tool of our choice and**
- **Lazy execution to prevent multiple network hops to server**

Once the customer data is available in the RAW schema, we can use snowpark to create dimensions and fact tables. We will use the RAW_PARQUET table to create following tables -

- DEMOGRAPHICS
- LOCATION
- STATUS
- SERVICES

We will also transform and clean the data using Snowpark dataframe API

In [11]:
dfR = session.table(rawtable)

In [12]:
dfDemographics = dfR.select(col("CUSTOMERID"),
                             col("COUNT").alias("COUNT"),
                             translate(col("GENDER"),lit("NULL"),lit("Male")).alias("GENDER"),
                             col("SENIOR CITIZEN").alias("SENIORCITIZEN"),
                             col("PARTNER"),
                             col("DEPENDENTS")          
                            )


dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
dfDemographics.show()

----------------------------------------------------------------------------------
|"CUSTOMERID"  |"COUNT"  |"GENDER"  |"SENIORCITIZEN"  |"PARTNER"  |"DEPENDENTS"  |
----------------------------------------------------------------------------------
|7090-ZyCMx    |1        |Female    |False            |False      |True          |
|1364-wJXMS    |1        |Female    |False            |False      |True          |
|6564-sLgIC    |1        |Male      |True             |False      |True          |
|7853-2xheR    |1        |Male      |False            |False      |True          |
|8457-E9FuW    |1        |Female    |False            |False      |True          |
|5718-ykxBT    |1        |Male      |False            |False      |True          |
|7092-gCJX5    |1        |Male      |False            |False      |False         |
|8249-GOs7s    |1        |Male      |True             |False      |False         |
|9445-kPPEc    |1        |Male      |False            |False      |False         |
|158

In [13]:
dfLocation = dfR.select(col("CUSTOMERID"),
                         col("COUNTRY").name("COUNTRY"),
                         col("STATE").name("STATE"),
                         col("CITY").name("CITY"),
                         translate(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
                         col("LAT LONG").name("LATLONG"),
                         col("LATITUDE").name("LATITUDE"),
                         col("LONGITUDE").name("LONGITUDE")       
                        )

dfLocation.write.mode('overwrite').saveAsTable('LOCATION')
dfLocation.show()

-------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"COUNTRY"      |"STATE"     |"CITY"           |"ZIPCODE"  |"LATLONG"               |"LATITUDE"  |"LONGITUDE"  |
-------------------------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |United States  |California  |Los Angeles      |90005      |34.059281, -118.30742   |34.059281   |-118.307420  |
|1364-wJXMS    |United States  |California  |Los Angeles      |90006      |34.048013, -118.293953  |34.048013   |-118.293953  |
|6564-sLgIC    |United States  |California  |Los Angeles      |90065      |34.108833, -118.229715  |34.108833   |-118.229715  |
|7853-2xheR    |United States  |California  |La Habra         |90631      |33.940619, -117.9513    |33.940619   |-117.951300  |
|8457-E9FuW    |United States  |California  |Glendale         |91206      |34.162515, -118.203869  |34.1

you can run transformation on data using similar dataframe API constructs, for example -


In [14]:
dfServices = dfR.select(col("CUSTOMERID"),
                       col("TENURE MONTHS").name("TENUREMONTHS"),
                       iff(is_null(col("PHONE SERVICE")),lit('N'),col("PHONE SERVICE")).name("PHONESERVICE"),
                       iff(is_null(col("MULTIPLE LINES")),lit("No"),col("MULTIPLE LINES")).name("MULTIPLELINES"),
                       iff(is_null(col("INTERNET SERVICE")),lit("No"),col("INTERNET SERVICE")).name("INTERNETSERVICE"),
                       iff(is_null(col("ONLINE SECURITY")),lit("No"),col("ONLINE SECURITY")).name("ONLINESECURITY"),
                       iff(is_null(col("ONLINE BACKUP")),lit("No"),col("ONLINE BACKUP")).name("ONLINEBACKUP"),
                       iff(is_null(col("DEVICE PROTECTION")),lit("No"),col("DEVICE PROTECTION")).name("DEVICEPROTECTION"),
                       iff(is_null(col("TECH SUPPORT")),lit('N'),col("TECH SUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("STREAMING TV")),lit("No"),col("STREAMING TV")).name("STREAMINGTV"),
                       iff(is_null(col("STREAMING MOVIES")),lit("No"),col("STREAMING MOVIES")).name("STREAMINGMOVIES"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESS BILLING")),lit('Y'),col("PAPERLESS BILLING")).name("PAPERLESSBILLING"),
                       iff(is_null(col("PAYMENT METHOD")),lit("Mailed check"),col("PAYMENT METHOD")).name("PAYMENTMETHOD"),
                       col("MONTHLY CHARGES").name("MONTHLYCHARGES"),
                       col("TOTAL CHARGES").name("TOTALCHARGES"),
                       col("CHURN VALUE").name("CHURNVALUE")        

                      )

dfServices.write.mode('overwrite').saveAsTable('SERVICES')
dfServices.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"TENUREMONTHS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"INTERNETSERVICE"  |"ONLINESECURITY"     |"ONLINEBACKUP"       |"DEVICEPROTECTION"   |"TECHSUPPORT"        |"STREAMINGTV"        |"STREAMINGMOVIES"    |"CONTRACT"      |"PAPERLESSBILLING"  |"PAYMENTMETHOD"   |"MONTHLYCHARGES"  |"TOTALCHARGES"  |"CHURNVALUE"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|7090-ZyCMx  

In [15]:
dfStatus = dfR.select(col("CUSTOMERID"),
                    iff(is_null(col("CHURN LABEL")),lit('N'),col("CHURN LABEL")).name("CHURNLABEL"),
                    col("CHURN VALUE").name("CHURNVALUE"),
                    col("CHURN SCORE").name("CHURNSCORE"),
                    col("CLTV").name("CLTV"),
                    iff(is_null(col("CHURN REASON")),lit("do not know"),col("CHURN REASON")).name("CHURNREASON")          
                    )

dfStatus.write.mode('overwrite').saveAsTable('STATUS')
dfStatus.show()

-----------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"CHURNLABEL"  |"CHURNVALUE"  |"CHURNSCORE"  |"CLTV"  |"CHURNREASON"                             |
-----------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |true          |1.0           |1             |2701    |Moved                                     |
|1364-wJXMS    |true          |1.0           |1             |5372    |Moved                                     |
|6564-sLgIC    |true          |1.0           |1             |3179    |Competitor made better offer              |
|7853-2xheR    |true          |1.0           |1             |4415    |Product dissatisfaction                   |
|8457-E9FuW    |true          |1.0           |1             |5142    |Price too high                            |
|5718-ykxBT    |true          |1.0           |1             |2484    |Poor expertise of 

# Lets check the data using an example query

This shows one of many uses of snowpark. You can build and query dataframes lazily.

In [16]:
# Lets run a query for quick sanity check
# This Query will show us the total revenue by city and contract term

dfLoc = session.table("LOCATION")
dfServ = session.table("SERVICES")

dfJoin = dfLoc.join(dfServ,dfLoc.col("CUSTOMERID") == dfServ.col("CUSTOMERID"))

dfResult = dfJoin.select(col("CITY"),
                         col("CONTRACT"),
                         col("TOTALCHARGES")).groupBy(col("CITY"),col("CONTRACT")).sum(col("TOTALCHARGES"))

dfResult.show()

----------------------------------------------------------
|"CITY"           |"CONTRACT"      |"SUM(TOTALCHARGES)"  |
----------------------------------------------------------
|Los Angeles      |Month-to-month  |3931004.7            |
|La Habra         |Month-to-month  |6828.35              |
|Glendale         |Month-to-month  |460483.05            |
|Burbank          |Month-to-month  |378354.4             |
|Ontario          |Two year        |57487.6              |
|Alpine           |Month-to-month  |69186.04999999999    |
|Borrego Springs  |Month-to-month  |94737.0              |
|Oceanside        |Month-to-month  |49559.5              |
|Niland           |Month-to-month  |24946.0              |
|San Bernardino   |Month-to-month  |253583.3             |
----------------------------------------------------------



### Let's create a view for data science team to begin data analysis

To do so, join up the **DEMOGRAPHICS** and **SERVICES** tables based on **CUSTOMERID**

In [17]:
dfD = session.table('DEMOGRAPHICS')
dfS = session.table('SERVICES')
dfJ = dfD.join(dfS, using_columns='CUSTOMERID', join_type = 'left')
dfJ.select(col('GENDER'),
              col('SENIORCITIZEN'),
              col('PARTNER'),
              col('DEPENDENTS'),
              col('MULTIPLELINES'),
              col('INTERNETSERVICE'),
              col('ONLINESECURITY'),
              col('ONLINEBACKUP'),
              col('DEVICEPROTECTION'),
              col('TECHSUPPORT'),
              col('STREAMINGTV'),
              col('STREAMINGMOVIES'),
              col('CONTRACT'),
              col('PAPERLESSBILLING'),
              col('PAYMENTMETHOD'),
              col('TENUREMONTHS'),
              col('MONTHLYCHARGES'),
              col('TOTALCHARGES'),
              col('CHURNVALUE'))
dfJ.create_or_replace_view('TRAIN_DATASET')

[Row(status='View TRAIN_DATASET successfully created.')]

In [18]:
%%time

raw = session.table('TRAIN_DATASET').sample(n = 20)
data = raw.toPandas()

CPU times: user 23.8 ms, sys: 7.28 ms, total: 31.1 ms
Wall time: 429 ms


# Off to ~02 notebook for exploratory data analysis

In [19]:
pd.pandas.set_option('display.max_columns', None)
data.head()

Unnamed: 0,CUSTOMERID,COUNT,GENDER,SENIORCITIZEN,PARTNER,DEPENDENTS,TENUREMONTHS,PHONESERVICE,MULTIPLELINES,INTERNETSERVICE,ONLINESECURITY,ONLINEBACKUP,DEVICEPROTECTION,TECHSUPPORT,STREAMINGTV,STREAMINGMOVIES,CONTRACT,PAPERLESSBILLING,PAYMENTMETHOD,MONTHLYCHARGES,TOTALCHARGES,CHURNVALUE
0,5519-4QhR4,1,Male,True,True,True,71,Yes,No,No,No internet service,No internet service,No internet service,No internet service,No internet service,No internet service,Two year,False,Credit card (automatic),19.8,1388.45,0.0
1,6348-Ta3Yx,1,Male,False,True,True,5,Yes,Yes,No,No internet service,No internet service,No internet service,No internet service,No internet service,No internet service,Month-to-month,True,Bank transfer (automatic),25.9,135.0,1.0
2,9338-uZxDn,1,Male,False,True,True,61,Yes,Yes,DSL,No,No,Yes,Yes,Yes,Yes,Two year,True,Bank transfer (automatic),81.0,4976.15,0.0
3,7215-nm9rD,1,Male,True,True,False,17,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,True,Credit card (automatic),100.45,1622.45,1.0
4,2980-QXc1Q,1,Male,True,True,False,27,Yes,Yes,DSL,No,Yes,Yes,Yes,Yes,Yes,Month-to-month,True,Bank transfer (automatic),84.8,2309.55,0.0


In [20]:
session.close()