In [None]:
USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE WAREHOUSE DASH_S WAREHOUSE_SIZE=SMALL;

USE DASH_DB.DASH_SCHEMA;

In [None]:
CREATE or REPLACE file format csvformat
  skip_header = 1
  type = 'CSV';

CREATE or REPLACE stage campaign_data_stage
  file_format = csvformat
  url = 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/campaign_spend/';

CREATE or REPLACE TABLE CAMPAIGN_SPEND (
  CAMPAIGN VARCHAR(60), 
  CHANNEL VARCHAR(60),
  DATE DATE,
  TOTAL_CLICKS NUMBER(38,0),
  TOTAL_COST NUMBER(38,0),
  ADS_SERVED NUMBER(38,0)
);

COPY into CAMPAIGN_SPEND
  from @campaign_data_stage;

In [None]:
CREATE or REPLACE stage monthly_revenue_data_stage
  file_format = csvformat
  url = 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/monthly_revenue/';

CREATE or REPLACE TABLE MONTHLY_REVENUE (
  YEAR NUMBER(38,0),
  MONTH NUMBER(38,0),
  REVENUE FLOAT
);

COPY into MONTHLY_REVENUE
  from @monthly_revenue_data_stage;

In [None]:
CREATE or REPLACE TABLE BUDGET_ALLOCATIONS_AND_ROI (
  MONTH varchar(30),
  SEARCHENGINE integer,
  SOCIALMEDIA integer,
  VIDEO integer,
  EMAIL integer,
  ROI float
);

INSERT INTO BUDGET_ALLOCATIONS_AND_ROI (MONTH, SEARCHENGINE, SOCIALMEDIA, VIDEO, EMAIL, ROI)
VALUES
('January',35,50,35,85,8.22),
('February',75,50,35,85,13.90),
('March',15,50,35,15,7.34),
('April',25,80,40,90,13.23),
('May',95,95,10,95,6.246),
('June',35,50,35,85,8.22);

In [None]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION
from snowflake.core import Root
from snowflake.core.task import Task, StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core import CreateMode

# Misc
from datetime import timedelta
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

session = get_active_session()


In [None]:

snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

In [None]:

with session.query_history() as history:
    snow_df_spend.show(20)
history.queries

In [None]:
# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

snow_df_spend_per_channel.show(10)

In [None]:

snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()
     

In [None]:

snow_df_spend_per_month.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

In [None]:
def campaign_spend_data_pipeline(session: Session) -> str:
  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend_t = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Save transformed data
  snow_df_spend_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

In [None]:

# Register data pipeline function as a task
root = Root(session)
my_task = Task(name='campaign_spend_data_pipeline_task'
               , definition=StoredProcedureCall(
                   campaign_spend_data_pipeline, stage_location='@dash_sprocs'
               )
               , warehouse='DASH_S'
               , schedule=timedelta(minutes=3))

tasks = root.databases[session.get_current_database()].schemas[session.get_current_schema()].tasks
task_res = tasks.create(my_task,mode=CreateMode.or_replace)

# By default a Task is suspended and we need to resume it if we want it run based on the schema. Note that we can still execute a task by calling the execute method.
task_res.execute()

In [None]:

snow_df_revenue = session.table('monthly_revenue')
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
snow_df_revenue_per_month.show()
     

In [None]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])
snow_df_spend_and_revenue_per_month.show()
     

In [None]:

snow_df_spend_and_revenue_per_month.explain()

In [None]:

snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

In [None]:
def monthly_revenue_data_pipeline(session: Session) -> str:
  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_spend_per_month_t = session.table('spend_per_month')
  snow_df_revenue_t = session.table('monthly_revenue')
  snow_df_revenue_per_month_t = snow_df_revenue_t.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month_t = snow_df_spend_per_month_t.join(snow_df_revenue_per_month_t, ["YEAR","MONTH"])

  # SAVE in a new table for the next task
  snow_df_spend_and_revenue_per_month_t.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')
     

In [None]:
# Delete the previous task
task_res.delete()

with DAG("de_pipeline_dag", schedule=timedelta(minutes=3)) as dag:
    # Create a task that runs our first pipleine
    dag_spend_task = DAGTask(name='campaign_spend_data_pipeline_task'
                        , definition=StoredProcedureCall(
                                    campaign_spend_data_pipeline, stage_location='@dash_sprocs'
                                )
                        ,warehouse='DASH_S'
                        )
    # Create a task that runs our second pipleine
    dag_revenue_task = DAGTask(name='monthly_revenue_data_pipeline'
                          , definition=StoredProcedureCall(
                                monthly_revenue_data_pipeline, stage_location='@dash_sprocs'
                            )
                        ,warehouse='DASH_S'
                        )
# Shift right and left operators can specify task relationships.
dag_spend_task >> dag_revenue_task  # dag_spend_task is a predecessor of dag_revenue_task

schema = root.databases[session.get_current_database()].schemas[session.get_current_schema()]
dag_op = DAGOperation(schema)

dag_op.deploy(dag,mode=CreateMode.or_replace)

# A DAG is not suspended by default so we will suspend the root task that will suspend the full DAG
root_task = tasks["DE_PIPELINE_DAG"]
root_task.suspend()

In [None]:

# dag_op.run(dag)

In [None]:

# root_task = tasks["DE_PIPELINE_DAG"]
# root_task.resume()
     

In [None]:
root_task = tasks["DE_PIPELINE_DAG"]
root_task.suspend()

In [None]:
# Snowpark for Python
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.version import VERSION

# Snowpark ML
from snowflake.ml.modeling.compose import ColumnTransformer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import PolynomialFeatures, StandardScaler
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry
from snowflake.ml.version import VERSION as ml_version

# Misc
#import pandas as pd
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

session = get_active_session()

In [None]:

# Load data
snow_df_spend_and_revenue_per_month = session.table('spend_and_revenue_per_month')

# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

In [None]:
CROSS_VALIDATION_FOLDS = 10
POLYNOMIAL_FEATURES_DEGREE = 2

# Create train and test Snowpark DataDrames
train_df, test_df = session.table("MARKETING_BUDGETS_FEATURES").random_split(weights=[0.8, 0.2], seed=0)

# Preprocess the Numeric columns
# We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
# NOTE: High degrees can cause overfitting.
numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = POLYNOMIAL_FEATURES_DEGREE)),('scaler', StandardScaler())])

# Combine the preprocessed step together using the Column Transformer module
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)])

# The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
parameteres = {}

# Use GridSearch to find the best fitting model based on number_of_folds folds
model = GridSearchCV(
    estimator=pipeline,
    param_grid=parameteres,
    cv=CROSS_VALIDATION_FOLDS,
    label_cols=["REVENUE"],
    output_cols=["PREDICTED_REVENUE"],
    verbose=2
)

# Fit and Score
model.fit(train_df)
train_r2_score = model.score(train_df)
test_r2_score = model.score(test_df)

# R2 score on train and test datasets
print(f"R2 score on Train : {train_r2_score}")
print(f"R2 score on Test  : {test_r2_score}")
     

In [None]:
registry = Registry(session)
MODEL_NAME = "PREDICT_ROI"

In [None]:

# NOTE: If you try to log the model with the same name, you may get "ValueError: (0000) Model PREDICT_ROI version v1 already existed." error. 
# If that's the case, uncomment and run this cell.

# registry.delete_model(MODEL_NAME)

In [None]:


mv = registry.log_model(model,
                        model_name=MODEL_NAME,
                        version_name="v1",
                        metrics={"R2_train": train_r2_score, "R2_test":test_r2_score},
                        comment='Model pipeline to predict revenue',
                        options={"embed_local_ml_library": True}
                    )
     

In [None]:

registry.show_models()

In [None]:
test_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]], 
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
mv.run(test_df, function_name='predict').show()

In [None]:
# Snowpark for Python API reference: https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/index.html
# Snowpark for Python Developer Guide: https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html

import calendar 
import altair as alt
import streamlit as st
import pandas as pd
import snowflake.snowpark as snowpark 
from snowflake.snowpark.functions import col

# Function to load last six months' budget allocations and ROI 
def load():
  data = session.table("BUDGET_ALLOCATIONS_AND_ROI").unpivot("Budget", "Channel", ["SearchEngine", "SocialMedia", "Video", "Email"]).filter(col("MONTH") != "July")
  alloc, rois, last_alloc = data.drop("ROI"), data.drop(["CHANNEL", "BUDGET"]).distinct(), data.filter(col("MONTH") == "June")
  return data.to_pandas(), alloc.to_pandas(), rois.to_pandas(), last_alloc.to_pandas()

def predict(budgets):
  pred = session.sql(f"SELECT ABS(PREDICT_ROI!predict({budgets[0]*1000},{budgets[1]*1000},{budgets[2]*1000},{budgets[3]*1000})['PREDICTED_REVENUE']::int) as PREDICTED_ROI").to_pandas()
  pred = pred["PREDICTED_ROI"].values[0] / 100000
  change = round(((pred / rois["ROI"].iloc[-1]) - 1) * 100, 1)
  return pred, change

def chart(chart_data):
  base = alt.Chart(chart_data).encode(alt.X("MONTH", sort=list(calendar.month_name), title=None))
  bars = base.mark_bar().encode(y=alt.Y("BUDGET", title="Budget", scale=alt.Scale(domain=[0, 300])), color=alt.Color("CHANNEL", legend=alt.Legend(orient="top", title=" ")), opacity=alt.condition(alt.datum.MONTH=="July", alt.value(1), alt.value(0.3)))
  lines = base.mark_line(size=3).encode(y=alt.Y("ROI", title="Revenue", scale=alt.Scale(domain=[0, 25])), color=alt.value("#808495"))
  points = base.mark_point(strokeWidth=3).encode(y=alt.Y("ROI"), stroke=alt.value("#808495"), fill=alt.value("white"), size=alt.condition(alt.datum.MONTH=="July", alt.value(300), alt.value(70)))
  chart = alt.layer(bars, lines + points).resolve_scale(y="independent").configure_view(strokeWidth=0).configure_axisY(domain=False).configure_axis(labelColor="#808495", tickColor="#e6eaf1", gridColor="#e6eaf1", domainColor="#e6eaf1", titleFontWeight=600, titlePadding=10, labelPadding=5, labelFontSize=14).configure_range(category=["#FFE08E", "#03C0F2", "#FFAAAB", "#995EFF"])
  st.altair_chart(chart, use_container_width=True)

# Streamlit config
st.header("SkiGear Co Ad Spend Optimizer")
st.subheader("Advertising budgets")

# Call functions to get Snowflake session and load data
session = snowpark.session._get_active_session()
channels = ["Search engine", "Email", "Social media", "Video"]
channels_upper = [channel.replace(" ", "").upper() for channel in channels]
data, alloc, rois, last_alloc = load()
last_alloc = last_alloc.replace(channels_upper, channels)

# Display advertising budget sliders and set their default values
col1, _, col2 = st.columns([4, 1, 4])
budgets = []
for alloc, col in zip(last_alloc.itertuples(), [col1, col1, col2, col2]):
  budgets.append(col.slider(alloc.CHANNEL, 0, 100, alloc.BUDGET, 5))

# Function to call "predict_roi" UDF that uses the pre-trained model for inference
# Note: Both the model training and UDF registration is done in Snowpark_For_Python.ipynb
pred, change = predict(budgets)
st.metric("", f"Predicted revenue ${pred:.2f} million", f"{change:.1f} % vs last month")
july = pd.DataFrame({"MONTH": ["July"]*4, "CHANNEL": channels_upper, "BUDGET": budgets, "ROI": [pred]*4})
chart(pd.concat([data, july]).reset_index(drop=True).replace(channels_upper, channels))

# Setup the ability to save user-entered allocations and predicted value back to Snowflake 
if st.button("❄️ Save to Snowflake"):
  with st.spinner("Making snowflakes..."):
    df = pd.DataFrame({"MONTH": ["July"], "SEARCHENGINE": [budgets[0]], "SOCIALMEDIA": [budgets[1]], "VIDEO": [budgets[2]], "EMAIL": [budgets[3]], "ROI": [pred]})
    session.write_pandas(df, "BUDGET_ALLOCATIONS_AND_ROI")  
    st.success("✅ Successfully wrote budgets & prediction to your Snowflake account!")
    st.snow()

In [None]:
root_task = tasks["DE_PIPELINE_DAG"]
root_task.suspend()