# Project Overview

Perform data analysis and data preparation tasks to train a Linear Regression model to predict future ROI (Return On Investment) of variable ad spend budgets across multiple channels including search, video, social media, and email using Snowpark for Python, Snowpark ML and Streamlit. By the end of the session, you will have an interactive web application deployed visualizing the ROI of different allocated advertising spend budgets.

Data Engineering -- Data Analysis and Data Preparation
In this Notebook, we will focus on Data Engineering in Snowflake using Snowpark for Python.

Establish secure connection to Snowflake
Load data from Snowflake tables into Snowpark DataFrames
Perform Exploratory Data Analysis on Snowpark DataFrames
Pivot and Join data from multiple tables using Snowpark DataFrames
Demostrate how to automate data preparation using Snowflake Tasks

## Import libraries

In [1]:
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION

# Misc
import json
import os
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

## Establish Secure Connection to Snowflake

Using the Snowpark Python API, it’s quick and easy to establish a secure connection between Snowflake and Notebook.

In [2]:
# Create Snowflake Session object
path = os.environ.get('CAS_CREDENTIALS')
connection_parameters = json.load(open(f'{path}\\connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : PSTYLS
Role                        : "ACCOUNTADMIN"
Database                    : "DASH_DB"
Schema                      : "DASH_SCHEMA"
Warehouse                   : "DASH_L"
Snowflake version           : 7.25.0
Snowpark for Python version : 1.4.0


In [3]:
type(session)

snowflake.snowpark.session.Session

## Load Aggregated Campaign Spend Data from Snowflake table into Snowpark DataFrame

Let's first load the campaign spend data. This table contains ad click data that has been aggregated to show daily spend across digital ad channels including search engines, social media, email and video.

Note: Some other ways to load data in a Snowpark DataFrame
- session.sql("select col1, col2... from tableName")
- session.read.options({"field_delimiter": ",", "skip_header": 1}).schema(user_schema).csv("@mystage/testCSV.csv")
- session.read.parquet("@stageName/path/to/file")
- session.create_dataframe([1,2,3], schema=["col1"])

In [4]:
snow_df_spend = session.table('campaign_spend')

display(type(snow_df_spend), snow_df_spend, snow_df_spend.queries)

snowflake.snowpark.table.Table

<snowflake.snowpark.table.Table at 0x17425599e50>

{'queries': ['SELECT  *  FROM (campaign_spend)'], 'post_actions': []}

Actions like show(), collect(), count() send the DataFrame SQL for execution on the server

Note: History object provides the query ID which can be helpful for debugging as well as the SQL query executed on the server.

In [7]:
with session.query_history() as history:
    snow_df_spend.show()
history.queries

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

[QueryRecord(query_id='01aded32-0c04-cefa-0003-6a9a0001700e', sql_text='SELECT  *  FROM campaign_spend LIMIT 10')]

In [8]:
snow_df_spend.show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

## Execute simple SQL queries using snowpark

In [9]:
myQuery = """
SELECT * 
FROM campaign_spend 
LIMIT 10
"""
session.sql(myQuery).show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

In [10]:
totalRows = """
SELECT count(*) 
FROM campaign_spend 
"""
session.sql(totalRows).show()

--------------
|"COUNT(*)"  |
--------------
|293120      |
--------------



In [11]:
snow_df_spend.count()

293120

## Total Spend per Year and Month For All Channels

Let's transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions.

TIP: For a full list of functions, refer to the [documentation](https://docs.snowflake.com/developer-guide/snowpark/reference/python/latest/index).

In [13]:
snow_df_spend_per_channel = (
    snow_df_spend
    .group_by(year('DATE'), 
              month('DATE'), 
              'CHANNEL')
    .agg(sum('TOTAL_COST').as_('TOTAL_COST'))
    .with_column_renamed('"YEAR(DATE)"', "YEAR")
    .with_column_renamed('"MONTH(DATE)"',"MONTH")
    .sort('YEAR','MONTH', 'CHANNEL')
)

display(type(snow_df_spend_per_channel), 
        snow_df_spend_per_channel.queries,
        snow_df_spend_per_channel.show())

---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |email          |517208        |
|2012    |5        |search_engine  |516431        |
|2012    |5        |social_media   |517618        |
|2012    |5        |video          |516729        |
|2012    |6        |email          |501947        |
|2012    |6        |search_engine  |506497        |
|2012    |6        |social_media   |504679        |
|2012    |6        |video          |501098        |
|2012    |7        |email          |518405        |
|2012    |7        |search_engine  |522780        |
---------------------------------------------------



snowflake.snowpark.dataframe.DataFrame

{'queries': ['SELECT  *  FROM ( SELECT "YEAR(DATE)" AS "YEAR", "MONTH(DATE)" AS "MONTH", "CHANNEL", "TOTAL_COST" FROM ( SELECT year("DATE") AS "YEAR(DATE)", month("DATE") AS "MONTH(DATE)", "CHANNEL", sum("TOTAL_COST") AS "TOTAL_COST" FROM ( SELECT  *  FROM campaign_spend) GROUP BY year("DATE"), month("DATE"), "CHANNEL")) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST, "CHANNEL" ASC NULLS FIRST'],
 'post_actions': []}

None

In [14]:
myQuery = """
SELECT 
    year(DATE) as YEAR,
    month(DATE) as MONTH,
    CHANNEL,
    sum(TOTAL_COST) as TOTAL_COST
FROM campaign_spend 
GROUP BY YEAR, MONTH, CHANNEL
ORDER BY YEAR,MONTH,CHANNEL
"""
session.sql(myQuery).show()

---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |email          |517208        |
|2012    |5        |search_engine  |516431        |
|2012    |5        |social_media   |517618        |
|2012    |5        |video          |516729        |
|2012    |6        |email          |501947        |
|2012    |6        |search_engine  |506497        |
|2012    |6        |social_media   |504679        |
|2012    |6        |video          |501098        |
|2012    |7        |email          |518405        |
|2012    |7        |search_engine  |522780        |
---------------------------------------------------



## Pivot on Channel: Total Spend Across All Channels

Let's further transform the campaign spend data so that **each row will represent total cost across all channels** per year/month using pivot() and sum() Snowpark DataFrame functions. This transformation will enable us to join with the revenue table such that we will have our input features and target variable in a single table for model training.

In [18]:
display(snow_df_spend.show(),                  ## Original table
        snow_df_spend_per_channel.show())      ## Aggregated query on original table

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

None

None

In [21]:
snow_df_spend_per_month = (
    snow_df_spend_per_channel
    .pivot('CHANNEL',['search_engine','social_media','video','email'])
    .sum('TOTAL_COST')
    .sort('YEAR','MONTH')
)

snow_df_spend_per_month.show()

-----------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"'search_engine'"  |"'social_media'"  |"'video'"  |"'email'"  |
-----------------------------------------------------------------------------------
|2012    |5        |516431             |517618            |516729     |517208     |
|2012    |6        |506497             |504679            |501098     |501947     |
|2012    |7        |522780             |521395            |522762     |518405     |
|2012    |8        |519959             |520537            |520685     |521584     |
|2012    |9        |507211             |507404            |511364     |507363     |
|2012    |10       |518942             |520863            |522768     |519950     |
|2012    |11       |505715             |505221            |505292     |503748     |
|2012    |12       |520148             |520711            |521427     |520724     |
|2013    |1        |522151             |518635            |520583     |52116

In [22]:
## Clean up the column names
snow_df_spend_per_month = (
    snow_df_spend_per_month
    .select(
        col('YEAR'),
        col('MONTH'),
        col("'search_engine'").as_("SEARCH_ENGINE"),
        col("'social_media'").as_("SOCIAL_MEDIA"),
        col("'video'").as_("VIDEO"),
        col("'email'").as_("EMAIL")
    )
)

snow_df_spend_per_month.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |10       |518942           |520863          |522768   |519950   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2012    |12       |520148           |520711          |521427   |520724   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |469856   |469784   |
------------

## Save Transformed Data into Snowflake Table

Let's save the transformed data into a Snowflake table SPEND_PER_MONTH.

In [23]:
session.sql('SHOW TABLES').show()

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"created_on"                      |"name"                      |"database_name"  |"schema_name"  |"kind"  |"comment"  |"cluster_by"  |"rows"  |"bytes"  |"owner"       |"retention_time"  |"automatic_clustering"  |"change_tracking"  |"is_external"  |"owner_role_type"  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-07-25 05:16:05.081000-07:00  |BUDGET_ALLOCATIONS_AND_ROI  |DASH_DB          |DASH_SCHEMA    |TABLE   |           |              |6       |2560     |ACCOUNTADMIN  |1                 |OF

In [29]:
(snow_df_spend_per_month
 .write
 .mode('overwrite')
 .save_as_table('SPEND_PER_MONTH')
)

In [30]:
session.sql('SHOW TABLES').show()

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"created_on"                      |"name"                      |"database_name"  |"schema_name"  |"kind"  |"comment"  |"cluster_by"  |"rows"  |"bytes"  |"owner"       |"retention_time"  |"automatic_clustering"  |"change_tracking"  |"is_external"  |"owner_role_type"  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-07-25 05:16:05.081000-07:00  |BUDGET_ALLOCATIONS_AND_ROI  |DASH_DB          |DASH_SCHEMA    |TABLE   |           |              |6       |2560     |ACCOUNTADMIN  |1                 |OF

## Data Pipelines

You can also operationalize the data transformations in the form of automated data pipelines running in Snowflake.

### Root/parent Task
This task automates loading campain spend data and performing various transformations.

#### First start by getting the query to work

In [155]:
## Specify the group by columns
groups_cols = [year("DATE"), month("DATE"), "CHANNEL"]

## Specify how to sort the results
sort_cols = ["YEAR","MONTH"]

## Get the distinct values of the CHANNEL column as a list
distinct_values = (
    campaign_spend_tbl
    .select("CHANNEL")    ## Select the Snowflake tbl column
    .distinct()           ## Get distinct values of the column
    .to_pandas()          ## Convert the results from snowflake to local dataframe
    .CHANNEL              ## Select the single column of the dataframe
    .to_list()            ## Make a list to use in the query
)

##
## Create the new SQL table
##
campaign_spend_per_date_channel = (
    campaign_spend_tbl
## Group by Date and channel and agg total_cost
    .groupBy(groups)
    .agg(sum("TOTAL_COST"))
    .with_column_renamed(col("YEAR(DATE)"), "YEAR")
    .with_column_renamed(col("MONTH(DATE)"), "MONTH")
    .with_column_renamed(col("SUM(TOTAL_COST)"), "TOTAL_COST")
## Pivot the result table
    .pivot("CHANNEL",distinct_values)
    .sum("TOTAL_COST")
## Rename columns in the table
    .select(
        col("YEAR"),
        col("MONTH"),
        col("'search_engine'").as_("SEARCH_ENGINE"),
        col("'social_media'").as_("SOCIAL_MEDIA"),
        col("'video'").as_("VIDEO"),
        col("'email'").as_("EMAIL")
    )
    .write.mode('overwrite').save_as_table('SPEND_PER_MONTH')
)

#### Add the query to a function and save it as a stored proc

In [140]:
def campaign_spend_data_pipeline(session: Session) -> str:
    '''
    DATA TRANSFORMATION FOR SNOWFLAKE
    1. Use the campaign_spend table
    2. Aggregate total_cost by Year/Month/Channel
    3. Pivot the aggregated table by each channel
    4. Save the final pivoted table 
    '''
    
    ##
    ## LOAD THE CAMPAIGN_SPEND TABLE
    ##
    campaign_spend_tbl = session.table('campaign_spend')
    
    
    ##
    ## CREATE NEW SQL TABLE
    ##
    
    ## Specify the group by columns
    groups_cols = [year("DATE"), month("DATE"), "CHANNEL"]

    ## Specify how to sort the results
    sort_cols = ["YEAR","MONTH"]

    ## Get the distinct values of the CHANNEL column as a list
    distinct_values = (
        campaign_spend_tbl
        .select("CHANNEL")    ## Select the Snowflake tbl column
        .distinct()           ## Get distinct values of the column
        .to_pandas()          ## Convert the results from snowflake to local dataframe
        .CHANNEL              ## Select the single column of the dataframe
        .to_list()            ## Make a list to use in the query
    )

    ## Create the new SQL table   
    campaign_spend_per_date_channel = (
        campaign_spend_tbl
    ## Group by Date and channel and agg total_cost
        .groupBy(groups)
        .agg(sum("TOTAL_COST"))
        .with_column_renamed(col("YEAR(DATE)"), "YEAR")
        .with_column_renamed(col("MONTH(DATE)"), "MONTH")
        .with_column_renamed(col("SUM(TOTAL_COST)"), "TOTAL_COST")
    ## Pivot the result table
        .pivot("CHANNEL",distinct_values)
        .sum("TOTAL_COST")
    ## Rename columns in the table
        .select(
            col("YEAR"),
            col("MONTH"),
            col("'search_engine'").as_("SEARCH_ENGINE"),
            col("'social_media'").as_("SOCIAL_MEDIA"),
            col("'video'").as_("VIDEO"),
            col("'email'").as_("EMAIL")
        )
        .write.mode('overwrite').save_as_table('SPEND_PER_MONTH')
    )
    

##
## Register data pipeline function as a stored proc in Snowflake to run it as a task
##
session.sproc.register(
        func = campaign_spend_data_pipeline,
        name = "campaign_spend_data_pipeline",
        packages = ['snowflake-snowpark-python','pandas'],
        is_permanent=True,
        stage_location = "@dash_sprocs",
        replace = True
    )
    
##
## RUN STORED PROC
##
campaign_spend_data_pipeline_task = """
        CREATE OR REPLACE TASK campaign_spend_data_pipeline_task
            WAREHOUSE = 'DASH_L'
            SCHEDULE = '5 MINUTE'
        AS
            CALL campaign_spend_data_pipeline()
    """
    
session.sql(campaign_spend_data_pipeline_task).collect()

[Row(status='Task CAMPAIGN_SPEND_DATA_PIPELINE_TASK successfully created.')]

In [172]:
session.sql("SHOW TABLES").show()

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"created_on"                      |"name"                      |"database_name"  |"schema_name"  |"kind"  |"comment"  |"cluster_by"  |"rows"  |"bytes"  |"owner"       |"retention_time"  |"automatic_clustering"  |"change_tracking"  |"is_external"  |"owner_role_type"  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-07-25 05:16:05.081000-07:00  |BUDGET_ALLOCATIONS_AND_ROI  |DASH_DB          |DASH_SCHEMA    |TABLE   |           |              |6       |2560     |ACCOUNTADMIN  |1                 |OF

#### Start and suspend a stored procedure

In [151]:
session.sql("alter task campaign_spend_data_pipeline_task resume").collect()
# session.sql("alter task campaign_spend_data_pipeline_task suspend").collect()

[Row(status='Statement executed successfully.')]

### Child/dependant Task
This task automates loading monthly revenue data, performing various transformations, and joining it with transformed campaign spend data.

#### Start by getting the query to work

In [145]:
snow_df_revenue = session.table('monthly_revenue')
display(type(snow_df_revenue), snow_df_revenue.show(5))

---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
---------------------------------



snowflake.snowpark.table.Table

None

In [175]:
## Reference the table
snow_df_revenue = session.table('monthly_revenue')

## Specify the group by columns
groups_cols = ["YEAR","MONTH"]

## Specify join table
campaign_spend_per_month = session.table('SPEND_PER_MONTH')

revenue_by_month_tbl = (
    snow_df_revenue
    .group_by(groups_cols)
    .agg(sum("REVENUE"))
    .rename("SUM(REVENUE)","REVENUE")
)

revenue_by_month_tbl.show()
campaign_spend_per_month.show()

(revenue_by_month_tbl
 .join(campaign_spend_per_month,["YEAR","MONTH"])
)

## END AT 36

---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
|2012    |10       |3334028.46  |
|2013    |2        |2995042.21  |
|2013    |3        |3310662.6   |
|2013    |5        |3314107.1   |
|2013    |6        |3209691.3   |
---------------------------------

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |6        |506497           |504679          |501098   |501947   |
|2017    |12       |523302           |520194          |521552   |524296   |
|2017    |11       |505149           |502322          |504099   |501490   |
|2018    |3        |519889           |518499          |521048   |52

<snowflake.snowpark.dataframe.DataFrame at 0x1742c624220>