# Load Data Using Snowpark Python Client API

load, clean, transform the raw parquet dataset

In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

from sklearn import linear_model

import matplotlib.pyplot as plt

%matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns
from config import snowflake_conn_prop
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff



In [3]:
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists {}".format(snowflake_conn_prop["database"])).collect()
session.sql("use database {}".format(snowflake_conn_prop["database"])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop["schema"])).collect()
session.sql("use schema {}".format(snowflake_conn_prop["schema"])).collect()
session.sql("create or replace warehouse {} with \
             WAREHOUSE_SIZE=XSMALL AUTO_SUSPEND=120 AUTO_RESUME = TRUE".format(snowflake_conn_prop["warehouse"])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop["warehouse"])).collect()
print(session.sql("select current_warehouse(), current_database(), current_schema()").collect())

[Row(CURRENT_WAREHOUSE()='TEST_WH', CURRENT_DATABASE()='SNOWPARK_QUICKSTART', CURRENT_SCHEMA()='TELCO')]


## Infer files schema

In [7]:
filename = "raw_telco_data.parquet"
stagename = "raw_telco_data"
rawtable = "RAW_PARQUET_DATA"

In [9]:
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
session.file.put(filename,stagename)

[PutResult(source='raw_telco_data.parquet', target='raw_telco_data.parquet', source_size=3037540, target_size=3037552, source_compression='PARQUET', target_compression='PARQUET', status='UPLOADED', message='')]

In [10]:
session.sql("create or replace file format my_parquet_format type = parquet;").collect()
session.sql(f"create or replace table {rawtable} using template (\
                select array_agg(object_construct(*))\
                from table( infer_schema( location => '@{stagename}/{filename}',\
                                          file_format => 'MY_PARQUET_FORMAT' )\
                          )\
              ); ").collect()

[Row(status='Table RAW_PARQUET_DATA successfully created.')]

## Load data into snowflake

In [11]:
dfClear = session.table(rawtable).delete()

In [12]:
dfRaw = session.read.option("compression", "snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable, FORCE=True)

[Row(file='raw_telco_data/raw_telco_data.parquet', status='LOADED', rows_parsed=100000, rows_loaded=100000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

In [13]:
dfR = session.table(rawtable).sample(n=5)
dfR.toPandas()

Unnamed: 0,COUNTRY,CITY,PHONE SERVICE,MULTIPLE LINES,LATITUDE,ONLINE SECURITY,SENIOR CITIZEN,MONTHLY CHARGES,STREAMING MOVIES,PAYMENT METHOD,...,CHURN SCORE,GENDER,LONGITUDE,ONLINE BACKUP,TOTAL CHARGES,CLTV,CHURN REASON,DEVICE PROTECTION,STATE,ZIP CODE
0,United States,Somerset,Yes,Yes,38.606703,Yes,False,94.55,No,Credit card (automatic),...,0,Male,-120.586659,Yes,6078.75,4603,do not know,Yes,California,95684
1,United States,Palo Alto,Yes,Yes,37.416159,No,True,83.25,No,Electronic check,...,0,Female,-122.131337,No,308.05,5251,do not know,No,California,94306
2,United States,Downey,Yes,No,33.940884,Yes,False,109.9,Yes,Electronic check,...,0,Female,-118.128628,Yes,7624.2,5234,do not know,Yes,California,90241
3,United States,Sacramento,Yes,Yes,38.590035,No,False,91.05,Yes,Bank transfer (automatic),...,1,Female,-121.412455,No,2954.5,3590,Competitor offered more data,Yes,California,95825
4,United States,Burbank,Yes,No,34.169706,Yes,False,100.2,No,Credit card (automatic),...,0,Male,-118.323548,Yes,7209.0,5502,do not know,Yes,California,91506


## Transformation

In [14]:
dfR = session.table(rawtable)

In [16]:
dfDemographics = dfR.select( col("CUSTOMERID"),
                             col("COUNT").alias("COUNT"),
                             translate(col("GENDER"),lit("NULL"),lit("Male")).alias("GENDER"),
                             col("SENIOR CITIZEN").alias("SENIORCITIZEN"),
                             col("PARTNER"),
                             col("DEPENDENTS")          
                            )
dfDemographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
dfDemographics.show()

----------------------------------------------------------------------------------
|"CUSTOMERID"  |"COUNT"  |"GENDER"  |"SENIORCITIZEN"  |"PARTNER"  |"DEPENDENTS"  |
----------------------------------------------------------------------------------
|7090-ZyCMx    |1        |Female    |False            |False      |True          |
|1364-wJXMS    |1        |Female    |False            |False      |True          |
|6564-sLgIC    |1        |Male      |True             |False      |True          |
|7853-2xheR    |1        |Male      |False            |False      |True          |
|8457-E9FuW    |1        |Female    |False            |False      |True          |
|5718-ykxBT    |1        |Male      |False            |False      |True          |
|7092-gCJX5    |1        |Male      |False            |False      |False         |
|8249-GOs7s    |1        |Male      |True             |False      |False         |
|9445-kPPEc    |1        |Male      |False            |False      |False         |
|158

In [17]:
dfLocation = dfR.select( col("CUSTOMERID"),
                         col("COUNTRY").name("COUNTRY"),
                         col("STATE").name("STATE"),
                         col("CITY").name("CITY"),
                         translate(col("ZIP CODE"),lit("NULL"),lit(0)).name("ZIPCODE"),
                         col("LAT LONG").name("LATLONG"),
                         col("LATITUDE").name("LATITUDE"),
                         col("LONGITUDE").name("LONGITUDE")       
                        )
dfLocation.write.mode('overwrite').saveAsTable('LOCATION')
dfLocation.show()

-------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"COUNTRY"      |"STATE"     |"CITY"           |"ZIPCODE"  |"LATLONG"               |"LATITUDE"  |"LONGITUDE"  |
-------------------------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |United States  |California  |Los Angeles      |90005      |34.059281, -118.30742   |34.059281   |-118.307420  |
|1364-wJXMS    |United States  |California  |Los Angeles      |90006      |34.048013, -118.293953  |34.048013   |-118.293953  |
|6564-sLgIC    |United States  |California  |Los Angeles      |90065      |34.108833, -118.229715  |34.108833   |-118.229715  |
|7853-2xheR    |United States  |California  |La Habra         |90631      |33.940619, -117.9513    |33.940619   |-117.951300  |
|8457-E9FuW    |United States  |California  |Glendale         |91206      |34.162515, -118.203869  |34.1

In [18]:
dfServics = dfR.select(col("CUSTOMERID"),
                       col("TENURE MONTHS").name("TENUREMONTHS"),
                       iff(is_null(col("PHONE SERVICE")),lit('N'),col("PHONE SERVICE")).name("PHONESERVICE"),
                       iff(is_null(col("MULTIPLE LINES")),lit("No"),col("MULTIPLE LINES")).name("MULTIPLELINES"),
                       iff(is_null(col("INTERNET SERVICE")),lit("No"),col("INTERNET SERVICE")).name("INTERNETSERVICE"),
                       iff(is_null(col("ONLINE SECURITY")),lit("No"),col("ONLINE SECURITY")).name("ONLINESECURITY"),
                       iff(is_null(col("ONLINE BACKUP")),lit("No"),col("ONLINE BACKUP")).name("ONLINEBACKUP"),
                       iff(is_null(col("DEVICE PROTECTION")),lit("No"),col("DEVICE PROTECTION")).name("DEVICEPROTECTION"),
                       iff(is_null(col("TECH SUPPORT")),lit('N'),col("TECH SUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("STREAMING TV")),lit("No"),col("STREAMING TV")).name("STREAMINGTV"),
                       iff(is_null(col("STREAMING MOVIES")),lit("No"),col("STREAMING MOVIES")).name("STREAMINGMOVIES"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESS BILLING")),lit('Y'),col("PAPERLESS BILLING")).name("PAPERLESSBILLING"),
                       iff(is_null(col("PAYMENT METHOD")),lit("Mailed check"),col("PAYMENT METHOD")).name("PAYMENTMETHOD"),
                       col("MONTHLY CHARGES").name("MONTHLYCHARGES"),
                       col("TOTAL CHARGES").name("TOTALCHARGES"),
                       col("CHURN VALUE").name("CHURNVALUE")
                      )
dfServics.write.mode('overwrite').saveAsTable('SERVICES')
dfServics.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"TENUREMONTHS"  |"PHONESERVICE"  |"MULTIPLELINES"  |"INTERNETSERVICE"  |"ONLINESECURITY"     |"ONLINEBACKUP"       |"DEVICEPROTECTION"   |"TECHSUPPORT"        |"STREAMINGTV"        |"STREAMINGMOVIES"    |"CONTRACT"      |"PAPERLESSBILLING"  |"PAYMENTMETHOD"   |"MONTHLYCHARGES"  |"TOTALCHARGES"  |"CHURNVALUE"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|7090-ZyCMx  

In [19]:
dfStatus = dfR.select(col("CUSTOMERID"),
                    iff(is_null(col("CHURN LABEL")),lit('N'),col("CHURN LABEL")).name("CHURNLABEL"),
                    col("CHURN VALUE").name("CHURNVALUE"),
                    col("CHURN SCORE").name("CHURNSCORE"),
                    col("CLTV").name("CLTV"),
                    iff(is_null(col("CHURN REASON")),lit("do not know"),col("CHURN REASON")).name("CHURNREASON")          
                    )

dfStatus.write.mode('overwrite').saveAsTable('STATUS')
dfStatus.show()

-----------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"CHURNLABEL"  |"CHURNVALUE"  |"CHURNSCORE"  |"CLTV"  |"CHURNREASON"                             |
-----------------------------------------------------------------------------------------------------------------
|7090-ZyCMx    |true          |1.0           |1             |2701    |Moved                                     |
|1364-wJXMS    |true          |1.0           |1             |5372    |Moved                                     |
|6564-sLgIC    |true          |1.0           |1             |3179    |Competitor made better offer              |
|7853-2xheR    |true          |1.0           |1             |4415    |Product dissatisfaction                   |
|8457-E9FuW    |true          |1.0           |1             |5142    |Price too high                            |
|5718-ykxBT    |true          |1.0           |1             |2484    |Poor expertise of 

In [20]:
dfLoc = session.table("LOCATION")
dfServ = session.table("SERVICES")
dfJoin = dfLoc.join(dfServ, dfLoc.col("CUSTOMERID") == dfServ.col("CUSTOMERID"))
dfResult = dfJoin.select(col("CITY"),col("CONTRACT"), col("TOTALCHARGES")
                            ).groupBy(col("CITY"), col("CONTRACT")
                            ).sum(col("TOTALCHARGES"))
dfResult.show()                                                                        

----------------------------------------------------------
|"CITY"           |"CONTRACT"      |"SUM(TOTALCHARGES)"  |
----------------------------------------------------------
|Los Angeles      |Month-to-month  |3931004.7            |
|La Habra         |Month-to-month  |6828.35              |
|Glendale         |Month-to-month  |460483.05            |
|Burbank          |Month-to-month  |378354.4             |
|Ontario          |Two year        |57487.6              |
|Alpine           |Month-to-month  |69186.04999999999    |
|Borrego Springs  |Month-to-month  |94737.0              |
|Oceanside        |Month-to-month  |49559.5              |
|Niland           |Month-to-month  |24946.0              |
|San Bernardino   |Month-to-month  |253583.3             |
----------------------------------------------------------



In [21]:
dfD = session.table("DEMOGRAPHICS")
dfS = session.table("SERVICES")
dfJ = dfD.join(dfS, using_columns='CUSTOMERID', join_type='left')
dfJ.select( col('GENDER'),
            col('SENIORCITIZEN'),
            col('PARTNER'),
            col('DEPENDENTS'),
            col('MULTIPLELINES'),
            col('INTERNETSERVICE'),
            col('ONLINESECURITY'),
            col('ONLINEBACKUP'),
            col('DEVICEPROTECTION'),
            col('TECHSUPPORT'),
            col('STREAMINGTV'),
            col('STREAMINGMOVIES'),
            col('CONTRACT'),
            col('PAPERLESSBILLING'),
            col('PAYMENTMETHOD'),
            col('TENUREMONTHS'),
            col('MONTHLYCHARGES'),
            col('TOTALCHARGES'),
            col('CHURNVALUE')
            )
dfJ.create_or_replace_view("TRAIN_DATASET")

[Row(status='View TRAIN_DATASET successfully created.')]

In [22]:
%%time
raw = session.table("TRAIN_DATASET").sample(n = 20)
data = raw.toPandas()

CPU times: user 19.4 ms, sys: 3.5 ms, total: 22.9 ms
Wall time: 1.25 s


In [23]:
pd.pandas.set_option('display.max_columns', None)
data.head()

Unnamed: 0,CUSTOMERID,COUNT,GENDER,SENIORCITIZEN,PARTNER,DEPENDENTS,TENUREMONTHS,PHONESERVICE,MULTIPLELINES,INTERNETSERVICE,ONLINESECURITY,ONLINEBACKUP,DEVICEPROTECTION,TECHSUPPORT,STREAMINGTV,STREAMINGMOVIES,CONTRACT,PAPERLESSBILLING,PAYMENTMETHOD,MONTHLYCHARGES,TOTALCHARGES,CHURNVALUE
0,3841-hQh5W,1,Female,False,True,False,3,Yes,Yes,Fiber optic,No,No,No,No,Yes,Yes,Month-to-month,False,Electronic check,94.9,273.2,0.0
1,4058-ouP19,1,Female,False,True,False,3,Yes,Yes,Fiber optic,No,No,No,No,No,No,Month-to-month,True,Credit card (automatic),74.1,228.0,1.0
2,1004-gR3Hj,1,Female,False,True,False,16,Yes,Yes,Fiber optic,No,No,No,No,Yes,No,Month-to-month,True,Electronic check,84.95,1378.25,1.0
3,1290-JJejm,1,Male,False,False,False,72,Yes,Yes,Fiber optic,No,Yes,Yes,Yes,Yes,Yes,Two year,False,Bank transfer (automatic),109.2,7878.3,0.0
4,1961-GGRzm,1,Male,False,False,False,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,False,Bank transfer (automatic),42.3,1840.75,0.0


In [24]:
session.close()