In [2]:
import json
import pandas as pd

# Import Libraries
from snowflake.snowpark.session import Session
from snowflake.snowpark.version import VERSION
import snowflake.snowpark.functions as F


In [3]:
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
conn = Session.builder.configs(connection_parameters).create()
conn.sql_simplifier_enabled = True
snowflake_environment = conn.sql('select current_user(), current_role(), current_database(), current_schema(), current_version(), current_warehouse()').collect()
snowpark_version = VERSION
# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(snowflake_environment[0][1]))
print('Database                    : {}'.format(snowflake_environment[0][2]))
print('Schema                      : {}'.format(snowflake_environment[0][3]))
print('Warehouse                   : {}'.format(snowflake_environment[0][5]))
print('Snowflake version           : {}'.format(snowflake_environment[0][4]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : MOHAMMED
Role                        : SE_ROLE
Database                    : None
Schema                      : None
Warehouse                   : SE_DEMO_WH
Snowflake version           : 7.35.1
Snowpark for Python version : 1.8.0


In [7]:
#Creating a schema for Churn prediction in GDC_DEMO database
# One time run 

conn.sql(f'''create schema GDC_DEMO.CHURN_PREDICTION;''').collect()
# conn.sql(f"""show schemas;""").collect()

[Row(status='Schema CHURN_PREDICTION successfully created.')]

In [9]:
# Clone required tables from spotflix db
conn.sql(f'''CREATE TABLE GDC_DEMO.CHURN_PREDICTION.dim_media_customers CLONE SPOTFLIX.PUBLIC."dim_media_customers";''').collect()

[Row(status='Table DIM_MEDIA_CUSTOMERS successfully created.')]

In [4]:
fact_media_show_events_df = conn.table('SPOTFLIX.PUBLIC."fact_media_show_events"')
fact_media_show_events_df.show()

-----------------------------------------------------------------------------------------------------------------------
|"record_id"  |"customer_id"  |"show_id"  |"episode_id"  |"viewdate"  |"viewtime"  |"viewruntime"  |"viewruntimepct"  |
-----------------------------------------------------------------------------------------------------------------------
|1462273      |11989          |74         |459           |2019-03-23  |17:12:35    |0              |-1                |
|1462274      |11989          |3          |7             |2018-08-24  |00:00:00    |88             |98                |
|1462275      |11989          |3          |8             |2018-08-26  |22:40:05    |60             |100               |
|1462276      |11989          |3          |9             |2018-08-28  |17:34:54    |59             |99                |
|1462277      |11989          |3          |10            |2018-08-30  |08:54:16    |58             |98                |
|1462278      |11989          |3        

In [5]:
dim_media_cust_df = conn.table('GDC_DEMO.CHURN_PREDICTION.dim_media_customers')
dim_media_cust_df.schema.fields

[StructField('"customer_id"', LongType(), nullable=True),
 StructField('"customer_name"', StringType(16777216), nullable=True),
 StructField('"gender"', StringType(16777216), nullable=True),
 StructField('"age"', LongType(), nullable=True),
 StructField('"age_group"', StringType(16777216), nullable=True),
 StructField('"street_address"', StringType(16777216), nullable=True),
 StructField('"city"', StringType(16777216), nullable=True),
 StructField('"state"', StringType(16777216), nullable=True),
 StructField('"zipcode"', StringType(16777216), nullable=True),
 StructField('"county"', StringType(16777216), nullable=True),
 StructField('"latitude"', DoubleType(), nullable=True),
 StructField('"longitude"', DoubleType(), nullable=True),
 StructField('"region"', StringType(16777216), nullable=True),
 StructField('"opendate"', DateType(), nullable=True),
 StructField('"closedate"', DateType(), nullable=True)]

In [6]:
dim_media_cust_df.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"customer_id"  |"customer_name"   |"gender"  |"age"  |"age_group"  |"street_address"        |"city"           |"state"  |"zipcode"  |"county"            |"latitude"  |"longitude"   |"region"   |"opendate"  |"closedate"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1              |Alma Bassani      |Female    |18     |18-24        |329 4th Street          |Manhattan Beach  |CA       |90266      |Los Angeles County  |33.8804079  |-118.4063552  |West       |2018-03-09  |NULL         |
|2              |Sherry Blanck     |Female    |25     |25-34        |1170 NEWBURY LN E       |Mobile        

In [7]:
dim_media_cust_df.to_pandas()

Unnamed: 0,customer_id,customer_name,gender,age,age_group,street_address,city,state,zipcode,county,latitude,longitude,region,opendate,closedate
0,1,Alma Bassani,Female,18,18-24,329 4th Street,Manhattan Beach,CA,90266,Los Angeles County,33.880408,-118.406355,West,2018-03-09,
1,2,Sherry Blanck,Female,25,25-34,1170 NEWBURY LN E,Mobile,AL,36695,Mobile County,30.666030,-88.217780,South,2017-10-18,
2,3,Frank Leehy,Male,81,65+,3070 S Hadley Rd,Ortonville,MI,48462,Oakland County,42.806329,-83.415179,Midwest,2018-03-03,2019-04-23
3,4,Warren Nease,Male,73,65+,1119 14TH St North,Birmingham,AL,35204,Jefferson County,33.522924,-86.821937,South,2018-03-25,2018-07-30
4,5,Miguel Cavill,Male,37,35-44,7526 MUSTANG CORRAL DR,Humble,TX,77338,Harris County,30.012319,-95.310301,Southwest,2017-07-17,2019-04-24
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49995,49996,Linda Casstevens,Female,28,25-34,18623 ROLLING SHORES CT,Humble,TX,77346,Harris County,29.988060,-95.143241,Southwest,2019-07-26,
49996,49997,Morris Tonnesen,Male,18,18-24,3009 FORT STANWIX RD,Henderson,NV,89052,Clark County,35.958662,-115.119954,West,2019-07-26,
49997,49998,Ana Servan,Female,19,18-24,23003 VIDA,Galveston,TX,77554,Galveston County,29.134890,-95.061040,Southwest,2019-07-26,
49998,49999,Charles Dayao,Male,41,35-44,37459 CHARTER OAKS BLVD,Clinton Township,MI,48036,Macomb County,42.574024,-82.916782,Midwest,2019-07-26,


In [None]:
df.with_column("A", F.iff((F.col("A") == F.lit('NaN')) | (F.col("A").is_null()),
                           F.avg(F.iff(F.col("A") == F.lit('NaN'), F.lit(None), F.col("A") )).over(), F.col("A"))).show()

In [23]:
dim_media_cust_df1 = dim_media_cust_df.with_column('churn',
                    F.when((F.col('"closedate"').is_null()) , 0 ).otherwise(1))
dim_media_cust_df1.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"customer_id"  |"customer_name"   |"gender"  |"age"  |"age_group"  |"street_address"        |"city"           |"state"  |"zipcode"  |"county"            |"latitude"  |"longitude"   |"region"   |"opendate"  |"closedate"  |"CHURN"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1              |Alma Bassani      |Female    |18     |18-24        |329 4th Street          |Manhattan Beach  |CA       |90266      |Los Angeles County  |33.8804079  |-118.4063552  |West       |2018-03-09  |NULL         |0        |
|2              |Sherry Blanck     |Female    |25     |25-34        

In [None]:
dim_media_cust_df1 = dim_media_cust_df.with_column('churn',
                    F.when((F.col('"closedate"').is_null()) , 0 ).otherwise(1))
dim_media_cust_df1.show()

In [None]:
dim_media_cust_df.with_column('viewing_frequency', dim_media_cust_df.group_by('"customer_id_1"','"viewdate"')
dim_media_cust_df.join(fact_media_show_events_df,'"customer_id"')
                    F.when((F.col('"closedate"').is_null()) , 0 ).otherwise(1))

In [27]:
dim_media_cust_df.join(fact_media_show_events_df,'"customer_id"').group_by('"customer_id"','"viewdate"').count().show()

----------------------------------------
|"customer_id"  |"viewdate"  |"COUNT"  |
----------------------------------------
|11989          |2018-08-08  |4        |
|11989          |2018-08-19  |1        |
|11989          |2019-04-08  |2        |
|11989          |2018-10-11  |1        |
|11989          |2018-11-08  |1        |
|11990          |2018-06-22  |1        |
|11989          |2018-08-05  |1        |
|11989          |2018-09-03  |1        |
|11989          |2018-09-24  |2        |
|11989          |2019-02-24  |1        |
----------------------------------------



In [3]:
df_table = conn.table("GDC_DEMO.PUBLIC.TEST_RESULTS")

In [4]:
df_table.show()

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"Transaction_Date"  |"Sales_Quantity"  |"Sales_Dollar_Amount"  |"Tender_Type"  |"day_of_year"  |"day_of_month"  |"day_of_week"  |"month"  |"quarter"  |"year"  |"Forecasted(yhat)"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2017-10-01          |5814              |24008.46               |2              |274            |1               |6              |10       |4          |2017    |5825.2060546875     |
|2017-10-02          |5387              |22378.37               |3              |275            |2               |0              |10       |4          |2017    |5400.099609375      |
|2017-10-03          |5966              |25199.03               |4              |276 

In [5]:
df_table.schema.fields

[StructField('"Transaction_Date"', DateType(), nullable=True),
 StructField('"Sales_Quantity"', LongType(), nullable=True),
 StructField('"Sales_Dollar_Amount"', DoubleType(), nullable=True),
 StructField('"Tender_Type"', LongType(), nullable=True),
 StructField('"day_of_year"', LongType(), nullable=True),
 StructField('"day_of_month"', LongType(), nullable=True),
 StructField('"day_of_week"', LongType(), nullable=True),
 StructField('"month"', LongType(), nullable=True),
 StructField('"quarter"', LongType(), nullable=True),
 StructField('"year"', LongType(), nullable=True),
 StructField('"Forecasted(yhat)"', DoubleType(), nullable=True)]

In [None]:
# Copying Table to GDC_DEMO DB
copy_table_query  = f"""
       CREATE TABLE IF NOT EXISTS GDC_DEMO.PUBLIC.dim_media_customers AS
SELECT * FROM SPOTFLIX.PUBLIC."dim_media_customers";
    """
conn.sql(copy_table_query).collect()

In [None]:
gdc_dim_media_customers = conn.table("GDC_DEMO.PUBLIC.dim_media_customers")
gdc_dim_media_customers.show()

In [None]:
gdc_dim_media_customers.describe().show()


In [None]:
gdc_dim_media_customers

In [None]:
# Adding Churn Column 

churn_column_query  = f"""
    ALTER TABLE GDC_DEMO.PUBLIC.DIM_MEDIA_CUSTOMERS
ADD churn INT DEFAULT 0; -- Default value is 0
    """
conn.sql(churn_column_query).collect()

churn1_column_query  = f"""
    UPDATE GDC_DEMO.PUBLIC.DIM_MEDIA_CUSTOMERS
SET churn = CASE WHEN "closedate" IS NOT NULL THEN 1 ELSE 0 END;
    """
conn.sql(churn1_column_query).collect()


In [None]:
# Adding Viewing_frequency Column 

viewing_frequency_column_query  = f"""
    ALTER TABLE GDC_DEMO.PUBLIC.DIM_MEDIA_CUSTOMERS
ADD viewing_frequency INT ;
    """
conn.sql(viewing_frequency_column_query).collect()

viewing_frequency1_column_query  = f"""
    UPDATE GDC_DEMO.PUBLIC.DIM_MEDIA_CUSTOMERS AS c
SET viewing_frequency = (
    SELECT COUNT(*)
    FROM SPOTFLIX.PUBLIC."fact_media_show_events" AS e
    WHERE e."customer_id" = c."customer_id"
);
    """
conn.sql(viewing_frequency1_column_query).collect()

In [None]:
# Adding Viewing_duration Column 

viewing_duration_column_query  = f"""
    ALTER TABLE GDC_DEMO.PUBLIC.DIM_MEDIA_CUSTOMERS
ADD viewing_duration INT ;
    """
conn.sql(viewing_duration_column_query).collect()

viewing_duration1_column_query  = f"""
    UPDATE GDC_DEMO.PUBLIC.DIM_MEDIA_CUSTOMERS AS c
SET viewing_duration = (
    SELECT SUM(e."viewruntime")
    FROM SPOTFLIX.PUBLIC."fact_media_show_events" AS e
    WHERE e."customer_id" = c."customer_id"
);
    """
conn.sql(viewing_duration1_column_query).collect()