## Data Engineering
We begin where all ML use cases do: data engineering. In this section of the demo, we will utilize Snowpark's Python client-side Dataframe API to build an **ELT pipeline**.  We will extract the data from the source system (s3), load it into snowflake and add transformations to clean the data before analysis. 




In [38]:
import snowflake.snowpark as snp
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T

import pandas as pd
from datetime import datetime
import requests
from zipfile import ZipFile
from io import BytesIO
import os

### 1. Load  credentials and connect to Snowflake


We will utilize a simple json file to store our credentials. This should **never** be done in production and is for demo purposes only.

In [40]:
from include.snowpark_connection import snowpark_connect
session, state_dict = snowpark_connect('./connection.json')

### 2. Create a stage for loading data to Snowflake

In [41]:
session.get_current_warehouse()
# session.get_current_schema()
# session.get_current_database()
# session.sql('select current_user').show()

'"A5_WH"'

In [42]:

session.sql("CREATE or REPLACE file format jsonformat type = 'JSON' strip_outer_array = true").collect()
session.sql("CREATE or REPLACE file format csvformat skip_header = 1 type = 'CSV'").collect()
prestaged=True
if prestaged:
    sql_cmd_data_json = 'CREATE OR REPLACE STAGE json_data_stage\
                  file_format = jsonformat\
                  url = "s3://sfquickstarts/Summit 2022 Keynote Demo/click_data/"'
        
    sql_cmd_campaign_spend = 'CREATE OR REPLACE STAGE campaign_data_stage\
                  file_format = csvformat\
                  url = "s3://sfquickstarts/Summit 2022 Keynote Demo/campaign_spend/"'
    
    sql_cmd_monthly_revenue="CREATE or REPLACE stage monthly_revenue_data_stage\
          file_format = csvformat\
          url = 's3://sfquickstarts/Summit 2022 Keynote Demo/monthly_revenue/'"
    session.sql(sql_cmd_data_json).collect()
    session.sql(sql_cmd_campaign_spend).collect()
    session.sql(sql_cmd_monthly_revenue).collect()
        
else: 
    session.sql('CREATE STAGE IF NOT EXISTS json_data_stage').collect()
    session.sql('CREATE STAGE IF NOT EXISTS campaign_data_stage').collect()
    session.sql('CREATE STAGE IF NOT EXISTS sql_cmd_monthly_revenue').collect()

In [43]:
session.sql('CREATE OR REPLACE STAGE dash_sprocs').collect()
session.sql('CREATE OR REPLACE STAGE dash_models').collect()
session.sql('CREATE OR REPLACE STAGE dash_udfs').collect()

[Row(status='Stage area DASH_UDFS successfully created.')]

In [20]:
session.use_warehouse(state_dict['warehouse'])

In [43]:
# #Upper case fields are common to both schemas.
# #Schema from 2013 to 2021
# load_schema1 = T.StructType([T.StructField("tripduration", T.StringType()),
#                              T.StructField("STARTTIME", T.StringType()), 
#                              T.StructField("STOPTIME", T.StringType()), 
#                              T.StructField("START_STATION_ID", T.StringType()),
#                              T.StructField("START_STATION_NAME", T.StringType()), 
#                              T.StructField("START_STATION_LATITUDE", T.StringType()),
#                              T.StructField("START_STATION_LONGITUDE", T.StringType()),
#                              T.StructField("END_STATION_ID", T.StringType()),
#                              T.StructField("END_STATION_NAME", T.StringType()), 
#                              T.StructField("END_STATION_LATITUDE", T.StringType()),
#                              T.StructField("END_STATION_LONGITUDE", T.StringType()),
#                              T.StructField("bike_id", T.StringType()),
#                              T.StructField("USERTYPE", T.StringType()), 
#                              T.StructField("birth_year", T.StringType()),
#                              T.StructField("gender", T.StringType())])

# #starting in February 2021 the schema changed
# load_schema2 = T.StructType([T.StructField("ride_id", T.StringType()), 
#                              T.StructField("rideable_type", T.StringType()), 
#                              T.StructField("STARTTIME", T.StringType()), 
#                              T.StructField("STOPTIME", T.StringType()), 
#                              T.StructField("START_STATION_NAME", T.StringType()), 
#                              T.StructField("START_STATION_ID", T.StringType()),
#                              T.StructField("END_STATION_NAME", T.StringType()), 
#                              T.StructField("END_STATION_ID", T.StringType()),
#                              T.StructField("START_STATION_LATITUDE", T.StringType()),
#                              T.StructField("START_STATION_LONGITUDE", T.StringType()),
#                              T.StructField("END_STATION_LATITUDE", T.StringType()),
#                              T.StructField("END_STATION_LONGITUDE", T.StringType()),
#                              T.StructField("USERTYPE", T.StringType())])

# trips_table_schema = T.StructType([T.StructField("STARTTIME", T.StringType()), 
#                              T.StructField("STOPTIME", T.StringType()), 
#                              T.StructField("START_STATION_NAME", T.StringType()), 
#                              T.StructField("START_STATION_ID", T.StringType()),
#                              T.StructField("END_STATION_NAME", T.StringType()), 
#                              T.StructField("END_STATION_ID", T.StringType()),
#                              T.StructField("START_STATION_LATITUDE", T.StringType()),
#                              T.StructField("START_STATION_LONGITUDE", T.StringType()),
#                              T.StructField("END_STATION_LATITUDE", T.StringType()),
#                              T.StructField("END_STATION_LONGITUDE", T.StringType()),
#                              T.StructField("USERTYPE", T.StringType())])

Create empty tables.

In [44]:
session.sql('CREATE or REPLACE TABLE CLICK_DATA_JSON (\
  click_data VARIANT)').collect()

[Row(status='Table CLICK_DATA_JSON successfully created.')]

Load click data

In [45]:

session.sql("COPY into CLICK_DATA_JSON\
  from @json_data_stage\
  on_error = 'skip_file'").collect()

[Row(file='s3://sfquickstarts/Summit 2022 Keynote Demo/click_data/data_11_3_0.json', status='LOADED', rows_parsed=90000, rows_loaded=90000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None),
 Row(file='s3://sfquickstarts/Summit 2022 Keynote Demo/click_data/data_11_5_0.json', status='LOADED', rows_parsed=90000, rows_loaded=90000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None),
 Row(file='s3://sfquickstarts/Summit 2022 Keynote Demo/click_data/data_13_7_0.json', status='LOADED', rows_parsed=80000, rows_loaded=80000, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None),
 Row(file='s3://sfquickstarts/Summit 2022 Keynote Demo/click_data/data_1_2_0.json', status='LOADED', rows_parsed=90000, rows_loaded=90000, error_limit=1, errors_seen=0, first_error=None, fir

In [46]:
session.sql("CREATE or REPLACE TABLE CLICK_DATA (\
  AD_ID varchar(30),\
  CHANNEL varchar(30),\
  CLICK integer,\
  COST FLOAT,\
  IPADDRESS varchar(30),\
  MACADDRESS varchar(30),\
  TIMESTAMP integer  ) \
AS SELECT CLICK_DATA:ad_id,CLICK_DATA:channel,CLICK_DATA:click,CLICK_DATA:cost,CLICK_DATA:ipaddress,CLICK_DATA:macaddress,CLICK_DATA:timestamp from CLICK_DATA_JSON").collect()

[Row(status='Table CLICK_DATA successfully created.')]

In [47]:
session.sql("CREATE or REPLACE TABLE CAMPAIGN_SPEND (\
  CAMPAIGN VARCHAR(60), \
  CHANNEL VARCHAR(60),\
  DATE DATE,  TOTAL_CLICKS NUMBER(38,0),\
  TOTAL_COST NUMBER(38,0),\
  ADS_SERVED NUMBER(38,0))").collect()

[Row(status='Table CAMPAIGN_SPEND successfully created.')]

In [48]:
session.sql("CREATE or REPLACE TABLE MONTHLY_REVENUE (\
  YEAR NUMBER(38,0),\
  MONTH NUMBER(38,0),\
  REVENUE FLOAT\
)").collect()

[Row(status='Table MONTHLY_REVENUE successfully created.')]

load amount spend on campaign data

In [49]:
session.sql("COPY into CAMPAIGN_SPEND\
  from @campaign_data_stage").collect()

[Row(file='s3://sfquickstarts/Summit 2022 Keynote Demo/campaign_spend/campaign_spend.csv', status='LOADED', rows_parsed=293120, rows_loaded=293120, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

load monthly revenue data

In [50]:
session.sql("COPY into MONTHLY_REVENUE\
  from @monthly_revenue_data_stage").collect()

[Row(file='s3://sfquickstarts/Summit 2022 Keynote Demo/monthly_revenue/monthly_revenue.csv', status='LOADED', rows_parsed=121, rows_loaded=121, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]

In [51]:
session.sql("CREATE or REPLACE TABLE BUDGET_ALLOCATIONS_AND_ROI (\
  MONTH varchar(30),\
  SEARCHENGINE integer,\
  SOCIALMEDIA integer,\
  VIDEO integer,\
  EMAIL integer,\
  ROI float\
)").collect()


session.sql("INSERT INTO BUDGET_ALLOCATIONS_AND_ROI (MONTH, SEARCHENGINE, SOCIALMEDIA, VIDEO, EMAIL, ROI)\
VALUES\
('January',35,50,35,85,8.22),\
('February',75,50,35,85,13.90),\
('March',15,50,35,15,7.34),\
('April',25,80,40,90,13.23),\
('May',95,95,10,95,6.246),\
('June',35,50,35,85,8.22)").collect()

[Row(number of rows inserted=6)]

In [52]:
session.close()