# Project Overview

Perform data analysis and data preparation tasks to train a Linear Regression model to predict future ROI (Return On Investment) of variable ad spend budgets across multiple channels including search, video, social media, and email using Snowpark for Python and Snowflake ML.


## Data Engineering -- Data Analysis and Data Preparation

- Load data from Snowflake tables into Snowpark DataFrames
- Perform Exploratory Data Analysis on Snowpark DataFrames
- Pivot and Join data from multiple tables using Snowpark DataFrames
- Demostrate how to automate data preparation using Snowflake Tasks

## Import Libraries

In [None]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION
from snowflake.core import Root
from snowflake.core.task import Task, StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core import CreateMode

# Misc
from datetime import timedelta
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

session = get_active_session()

# Get current warehouse to use later
result = session.sql('SELECT CURRENT_WAREHOUSE()')
current_wh = result.to_pandas()["CURRENT_WAREHOUSE()"][0]

## Load Aggregated Campaign Spend Data from Snowflake table into Snowpark DataFrame

Let's first load the campaign spend data. This table contains ad click data that has been aggregated to show daily spend across digital ad channels including search engines, social media, email and video.

Note: Some other ways to load data in a Snowpark DataFrame

- session.sql("select col1, col2... from tableName")
- session.read.options({"field_delimiter": ",", "skip_header": 1}).schema(user_schema).csv("@mystage/testCSV.csv")
- session.read.parquet("@stageName/path/to/file")
- session.create_dataframe([1,2,3], schema=["col1"])

TIP: Learn more about [Snowpark DataFrames](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/dataframe).

In [None]:
snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

Actions like show(), collect(), count() send the DataFrame SQL for execution on the server

Note: History object provides the query ID which can be helpful for debugging as well as the SQL query executed on the server.

In [None]:
with session.query_history() as history:
    snow_df_spend.show(20)
history.queries

## Total Spend per Year and Month For All Channels

Let's transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions.

TIP: For a full list of functions, refer to the [documentation](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/functions).

In [None]:
# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

snow_df_spend_per_channel.show(10)

## Pivot on Channel: Total Spend Across All Channels

Let's further transform the campaign spend data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions. This transformation will enable us to join with the revenue table such that we will have our input features and target variable in a single table for model training.

TIP: For a full list of functions, refer to the [documentation](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/functions).

In [None]:
snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()

## Save Transformed Data into Snowflake Table

Let's save the transformed data into a Snowflake table SPEND_PER_MONTH.

In [None]:
snow_df_spend_per_month.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

## Automation: Run Campaign Spend Data Transformations As a Snowflake Task

Note: Optionally you can run all these transformations as an automated task by deploying the code to Snowflake as a Snowpark Stored Procedure and executing it as a Snowflake Task.

TIP: Learn more about [Stored Procedures](https://docs.snowflake.com/en/sql-reference/stored-procedures-python) and [Snowflake Tasks](https://docs.snowflake.com/en/sql-reference/sql/create-task).

In [None]:
def campaign_spend_data_pipeline(session: Session) -> str:
  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend_t = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Save transformed data
  snow_df_spend_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

In [None]:
# Register data pipeline function as a task
root = Root(session)
my_task = Task(name='campaign_spend_data_pipeline_task'
               , definition=StoredProcedureCall(
                   campaign_spend_data_pipeline, stage_location='@dash_sprocs'
               )
               , warehouse=f'''{current_wh}'''
               , schedule=timedelta(minutes=3))

tasks = root.databases[session.get_current_database()].schemas[session.get_current_schema()].tasks
task_res = tasks.create(my_task,mode=CreateMode.or_replace)

# By default a Task is suspended and we need to resume it if we want it run based on the schema. Note that we can still execute a task by calling the execute method.
task_res.execute()

## Total Revenue per Year And Month

Now let's load revenue table and transform the data into revenue per year/month using group_by() and agg() functions.

In [None]:
snow_df_revenue = session.table('monthly_revenue')
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
snow_df_revenue_per_month.show()

## Join Total Spend and Total Revenue per Year and Month Across All Channels

Next let's join this revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training.

In [None]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])
snow_df_spend_and_revenue_per_month.show()

## Examine Snowpark DataFrame Query and Execution Plan

Snowpark makes is really convenient to look at the DataFrame query and execution plan using explain() Snowpark DataFrame function.

In [None]:
snow_df_spend_and_revenue_per_month.explain()

## Save Transformed Data into Snowflake Table

Let's save the transformed data into a Snowflake table SPEND_AND_REVENUE_PER_MONTH.

In [None]:
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

## Automation: Run Monthly Revenue Data Transformations As a Snowflake Task (DAG)

Note: Optionally you can run all these transformations as an automated task by deploying the code to Snowflake as a Snowpark Stored Procedure and executing it as a Snowflake Task. By using a DAG we can run it AFTER campaign_spend_data_pipeline_task.

TIP: Learn more about [Stored Procedures](https://docs.snowflake.com/en/sql-reference/stored-procedures-python) and [Snowflake Tasks](https://docs.snowflake.com/en/sql-reference/sql/create-task).

In [None]:
def monthly_revenue_data_pipeline(session: Session) -> str:
  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_spend_per_month_t = session.table('spend_per_month')
  snow_df_revenue_t = session.table('monthly_revenue')
  snow_df_revenue_per_month_t = snow_df_revenue_t.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month_t = snow_df_spend_per_month_t.join(snow_df_revenue_per_month_t, ["YEAR","MONTH"])

  # SAVE in a new table for the next task
  snow_df_spend_and_revenue_per_month_t.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

Note: Since monthly_revenue_data_pipeline is depened on that campaign_spend_data_pipeline is executed first we want to create a DAG to make sure they run in the correct order.

In [None]:
# Delete the previous task
task_res.delete()

with DAG("de_pipeline_dag", schedule=timedelta(minutes=3)) as dag:
    # Create a task that runs our first pipleine
    dag_spend_task = DAGTask(name='campaign_spend_data_pipeline_task'
                        , definition=StoredProcedureCall(
                                    campaign_spend_data_pipeline, stage_location='@dash_sprocs'
                                )
                        ,warehouse=f'''{current_wh}'''
                        )
    # Create a task that runs our second pipleine
    dag_revenue_task = DAGTask(name='monthly_revenue_data_pipeline'
                          , definition=StoredProcedureCall(
                                monthly_revenue_data_pipeline, stage_location='@dash_sprocs'
                            )
                        ,warehouse=f'''{current_wh}'''
                        )
# Shift right and left operators can specify task relationships.
dag_spend_task >> dag_revenue_task  # dag_spend_task is a predecessor of dag_revenue_task

schema = root.databases[session.get_current_database()].schemas[session.get_current_schema()]
dag_op = DAGOperation(schema)

dag_op.deploy(dag,mode=CreateMode.or_replace)

# A DAG is not suspended by default so we will suspend the root task that will suspend the full DAG
root_task = tasks["DE_PIPELINE_DAG"]
root_task.suspend()

## Run DAG

Note that we can manually run DAGs even if they are suspended

In [None]:
dag_op.run(dag)

## Resume DAG

In [None]:
root_task = tasks["DE_PIPELINE_DAG"]
root_task.resume()

## Suspend Tasks

Note: For the sake of this lab, if you resume the above tasks, suspend them to avoid unecessary resource utilization by executing the following commands.

In [None]:
root_task = tasks["DE_PIPELINE_DAG"]
root_task.suspend()

## Machine Learning

- Load features and target from Snowflake table into Snowpark DataFrame
- Prepare features for model training
- Train ML model using Snowpark ML in Snowflake and upload the model to Snowflake stage
- Register ML model and use it for inference from Snowflake Model Registry


## Import Libraries

In [None]:
# Snowpark for Python
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.version import VERSION

# Snowpark ML
from snowflake.ml.modeling.compose import ColumnTransformer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import PolynomialFeatures, StandardScaler
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry
from snowflake.ml.version import VERSION as ml_version

# Misc
#import pandas as pd
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

session = get_active_session()


## Features and Target

At this point we are ready to perform the following actions to save features and target for model training.

- Delete rows with missing values
- Exclude columns we don't need for modeling
- Save features into a Snowflake table called MARKETING_BUDGETS_FEATURES

In [None]:
# Load data
snow_df_spend_and_revenue_per_month = session.table('spend_and_revenue_per_month')

# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

## Model Training using Snowpark ML in Snowflake

Learn more about [Snowpark ML](https://docs.snowflake.com/developer-guide/snowpark-ml/snowpark-ml-modeling).

NOTE: For workloads that require a large amount of memory and compute resources, consider using [Snowpark-Optimized Warehouses](https://docs.snowflake.com/en/developer-guide/snowpark/python/python-snowpark-training-ml#snowpark-optimized-warehouses).

In [None]:
CROSS_VALIDATION_FOLDS = 10
POLYNOMIAL_FEATURES_DEGREE = 2

# Create train and test Snowpark DataDrames
train_df, test_df = session.table("MARKETING_BUDGETS_FEATURES").random_split(weights=[0.8, 0.2], seed=0)

# Preprocess the Numeric columns
# We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
# NOTE: High degrees can cause overfitting.
numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = POLYNOMIAL_FEATURES_DEGREE)),('scaler', StandardScaler())])

# Combine the preprocessed step together using the Column Transformer module
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)])

# The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
parameteres = {}

# Use GridSearch to find the best fitting model based on number_of_folds folds
model = GridSearchCV(
    estimator=pipeline,
    param_grid=parameteres,
    cv=CROSS_VALIDATION_FOLDS,
    label_cols=["REVENUE"],
    output_cols=["PREDICTED_REVENUE"],
    verbose=2
)

# Fit and Score
model.fit(train_df)
train_r2_score = model.score(train_df)
test_r2_score = model.score(test_df)

# R2 score on train and test datasets
print(f"R2 score on Train : {train_r2_score}")
print(f"R2 score on Test  : {test_r2_score}")

## Log Trained Model to Snowflake Model Registry

The Model Registry allows to store models as objects in a schema in Snowflake. Note that by default the database and schema of the session is used.

Learn more about [Model Registry](https://docs.snowflake.com/developer-guide/snowpark-ml/model-registry/overview).

In [None]:
registry = Registry(session)
MODEL_NAME = "PREDICT_ROI"

In [None]:
# NOTE: If you try to log the model with the same name, you may get "ValueError: (0000) Model PREDICT_ROI version v1 already existed." error. 
# If that's the case, uncomment and run this cell.

# registry.delete_model(MODEL_NAME)

In [None]:
mv = registry.log_model(model,
                        model_name=MODEL_NAME,
                        version_name="v0",
                        metrics={"R2_train": train_r2_score, "R2_test":test_r2_score},
                        comment='Model pipeline to predict revenue',
                        options={"embed_local_ml_library": True,
                                 "relax_version": False}
                    )

In [None]:
registry.show_models()

## Inference

Once the model is logged we can use it for inference on new data.

First we will create a Snowpark DataFrame with some sample data and then call the logged model to get new predictions. Note: we will handle negative values in our Streamlit application.

In [None]:
test_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]], 
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
mv.run(test_df, function_name='predict').show()

## Let's do some clean up now.

In [None]:
# Clean up
registry.delete_model(MODEL_NAME)

In [None]:
# Confirm it was deleted
registry.show_models()