In [None]:
# snowflake imports
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.functions import udf, col, lit, is_null, iff, initcap

# other imports
import os
import sys
import json
import pandas as pd
import datetime as dt
from IPython.display import display, Markdown
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.ensemble import RandomForestClassifier
from sklearn.compose import make_column_transformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import balanced_accuracy_score

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

f = open('creds.json')
creds = json.load(f)
f.close()


with open(creds["private_key_file"], "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=creds["private_key_passphrase"].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

# import snowflake configurations
snowflake_conn_prop = {
    "account": creds["account"],
    "user": creds["username"],
    "private_key":pkb    
  }

# create the session
session = Session.builder.configs(snowflake_conn_prop).create()

rolename = creds["rolename"] 
dbname = creds["dbname"] 
schemaname = creds["schemaname"] 
warehouse = creds["warehouse"] 

session.sql(f"USE ROLE {rolename}").collect()
session.sql(f"CREATE DATABASE IF NOT EXISTS {dbname}").collect()
session.sql(f"CREATE SCHEMA IF NOT EXISTS {dbname}.{schemaname}").collect()
session.sql(f"CREATE WAREHOUSE  IF NOT EXISTS {warehouse} \
                WAREHOUSE_SIZE = 'SMALL' \
                AUTO_SUSPEND = 300 \
                AUTO_RESUME = TRUE").collect()
session.sql(f"USE WAREHOUSE {warehouse}").collect()
session.sql(f"USE SCHEMA {dbname}.{schemaname}").collect()

In [None]:
filename = "raw_telco_data.parquet"
stagename = "rawdata"
rawtable = "RAW_PARQUET_DATA"
session.sql ("CREATE FILE FORMAT IF NOT EXISTS MY_PARQUET_FORMAT TYPE=PARQUET").collect()
# Create Snowflake stage
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()

datafilepath = creds["datafilepath"]
# Put the file in the stage
session.file.put(f'{datafilepath}/{filename}',stagename)

# Create file format and infer schema to create a table using INFER_SCHEMA builtin function
#session.sql("CREATE OR REPLACE FILE FORMAT MY_PARQUET_FORMAT TYPE = PARQUET;")

# create the table
session.sql(f"CREATE OR REPLACE \
            TABLE {rawtable} USING TEMPLATE ( \
                SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*)) \
                FROM \
                    TABLE( INFER_SCHEMA( \
                    LOCATION => '@{stagename}/{filename}', \
                    FILE_FORMAT => 'MY_PARQUET_FORMAT' \
                    ) \
                ) \
            );  ").collect()


# clear any data in the table just in case
dfClear = session.table(rawtable).delete()

# read the parquet file 
dfRaw = session.read.option("compression","snappy").parquet(f"@{stagename}/{filename}")
dfRaw.copy_into_table(rawtable, FORCE= True)
#dfRaw.copy_into_table(rawtable,MATCH_BY_COLUMN_NAME='CASE_SENSITIVE',FORCE= True)

In [None]:
df_raw=session.table(rawtable).sample(n=50)
df_raw.to_pandas() #.show()

In [None]:
# create a temporary view on the raw data to use for transformation
session.table("RAW_PARQUET_DATA").create_or_replace_temp_view('RAW_STAGE')
df_raw_stage = session.table("RAW_STAGE")

# loop through columns and rename them
for c in df_raw_stage.columns:
    df_raw_stage=df_raw_stage.rename(c, c.replace(' ', ''))

# write the new table to the warehouse
df_raw_stage.write.mode('overwrite').saveAsTable('RAW_STAGE')

In [None]:
df_location = df_raw_stage.select(
                        col("CUSTOMERID"),
                        col("STATE").name("STATE"),
                        col("CITY").name("CITY"),
                        col("ZIPCODE"),
                        col("LATLONG"),
                        col("LATITUDE").name("LATITUDE"),
                        col("LONGITUDE").name("LONGITUDE"),
                        initcap(col("COUNTRY")).alias("COUNTRY")
                        )

df_location.write.mode('overwrite').saveAsTable('LOCATION')
df_location.sample(n=50).to_pandas()

In [None]:
df_demographics = df_raw_stage.select(
                            col("CUSTOMERID"),
                            iff(col("GENDER") == "M",lit('Male'),col("GENDER")).name("GENDER"),
                            col("SENIORCITIZEN"),
                            col("PARTNER"),
                            col("DEPENDENTS")
                            )

df_demographics.write.mode('overwrite').saveAsTable('DEMOGRAPHICS')
df_demographics.sample(n=50).to_pandas()

In [None]:
df_status = df_raw_stage.select(col("CUSTOMERID"),
                    col("CHURNLABEL"),
                    col("CHURNVALUE"),
                    col("CHURNSCORE"),
                    iff(is_null(col("CHURNREASON")),lit("do not know"),col("CHURNREASON")).name("CHURNREASON")          
                    )

df_status.write.mode('overwrite').saveAsTable('STATUS')
df_status.sample(n=50).to_pandas()

In [None]:
df_services = df_raw_stage.select(col("CUSTOMERID"),
                       iff(is_null(col("TECHSUPPORT")),lit('No'),col("TECHSUPPORT")).name("TECHSUPPORT"),
                       iff(is_null(col("CONTRACT")),lit("Month-to-month"),col("CONTRACT")).name("CONTRACT"),
                       iff(is_null(col("PAPERLESSBILLING")),lit('True'),col("PAPERLESSBILLING")).name("PAPERLESSBILLING"),
                       col("MONTHLYCHARGES"),
                       col("TOTALCHARGES"),
                       col("CHURNVALUE"),
                       col("TENUREMONTHS"),
                       col("PHONESERVICE"),
                       col("MULTIPLELINES"),
                       col("INTERNETSERVICE"),
                       col("ONLINESECURITY"),
                       col("ONLINEBACKUP"),
                       col("DEVICEPROTECTION"),
                       col("STREAMINGTV"),
                       col("STREAMINGMOVIES"),
                       col("PAYMENTMETHOD")
                      )    

df_services.write.mode('overwrite').saveAsTable('SERVICES')
df_services.sample(n=50).to_pandas()

In [None]:
dfDemo = session.table("DEMOGRAPHICS")
dfServ = session.table("SERVICES")

dfJoin = dfDemo.join(dfServ,dfDemo.col("CUSTOMERID") == dfServ.col("CUSTOMERID")
    ).select(dfDemo.CUSTOMERID.alias('CUSTOMERID'), '*')
dfJoin.sample(n=50).to_pandas()

In [None]:
#pattern matching and column dropping
cols_to_drop=[col for col in dfJoin.columns if "_CUSTOMERID" in col]
dfJoin=dfJoin.drop(cols_to_drop)

# writing to the warehouse
dfJoin.write.mode('overwrite').saveAsTable('TELCO_DATASET')
dfJoin.to_pandas()