# Project Overview
Perform data analysis and data preparation tasks to train a Linear Regression model to predict future ROI (Return On Investment) of variable ad spend budgets across multiple channels including search, video, social media, and email using Snowpark for Python, Streamlit and scikit-learn. By the end of the session, you will have an interactive web application deployed visualizing the ROI of different allocated advertising spend budgets.

### Machine Learning
In ths Notebook, we will focus on Machine Learning in Snowflake using Snowpark for Python.

***Prerequisite**: Successful completion of [Snowpark_For_Python_DE.ipynb](1-Snowpark_For_Python_DE.ipynb).*

* Establish secure connection to Snowflake
* Load features and target from Snowflake table into Snowpark DataFrame
* Prepare features for model training
* Create a Python Stored Procedure to deploy model training code on Snowflake
* Create Python Scalar and Vectorized User-Defined Functions (UDF) for inference on new data points
  *  *NOTE: The Scalar UDF is called from the Streamlit App. See [Snowpark_Streamlit_Revenue_Prediction.py](3-Snowpark_Streamlit_Revenue_Prediction.py)

### Import Libraries

In [1]:
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType, StructField, DateType, Variant
from snowflake.snowpark.functions import udf, sum, col,array_construct,month,year,call_udf,lit
from snowflake.snowpark.version import VERSION
# Misc
import json
import pandas as pd
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

### Establish Secure Connection to Snowflake

Using the Snowpark API, it’s quick and easy to establish a secure connection between Snowflake and Notebook.

 *Connection options: Username/Password, MFA, OAuth, Okta, SSO*

In [2]:
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_role(), current_database(), current_schema(), current_version(), current_warehouse()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(snowflake_environment[0][1]))
print('Database                    : {}'.format(snowflake_environment[0][2]))
print('Schema                      : {}'.format(snowflake_environment[0][3]))
print('Warehouse                   : {}'.format(snowflake_environment[0][5]))
print('Snowflake version           : {}'.format(snowflake_environment[0][4]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : ADMIN
Role                        : ACCOUNTADMIN
Database                    : PYLADIES_DB
Schema                      : PYLADIES_SCHEMA
Warehouse                   : PYLADIES_L
Snowflake version           : 7.12.3
Snowpark for Python version : 1.1.0


### Features and Target

At this point we are ready to perform the following actions to save features and target for model training.

* Delete rows with missing values
* Exclude columns we don't need for modeling
* Save features into a Snowflake table called MARKETING_BUDGETS_FEATURES

In [3]:
# Load data
snow_df_spend_and_revenue_per_month = session.table('spend_and_revenue_per_month')

# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

---------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
---------------------------------------------------------------------
|516431           |517618          |516729   |517208   |3264300.11  |
|506497           |504679          |501098   |501947   |3208482.33  |
|522780           |521395          |522762   |518405   |3311966.98  |
|519959           |520537          |520685   |521584   |3311752.81  |
|507211           |507404          |511364   |507363   |3208563.06  |
|518942           |520863          |522768   |519950   |3334028.46  |
|505715           |505221          |505292   |503748   |3185894.64  |
|520148           |520711          |521427   |520724   |3334570.96  |
|522151           |518635          |520583   |521167   |3316455.44  |
|467736           |474679          |469856   |469784   |2995042.21  |
---------------------------------------------------------------------



### Model Training in Snowflake 

#### Python function to train a Linear Regression model using scikit-learn

Let's create a Python function that uses **scikit-learn and other packages which are already included in** [Snowflake Anaconda channel](https://repo.anaconda.com/pkgs/snowflake/) and therefore available on the server-side when executing the Python function as a Stored Procedure running in Snowflake.

This function takes the following as parameters:

* _session_: Snowflake Session object.
* _features_table_: Name of the table that holds the features and target variable.
* _number_of_folds_: Number of cross validation folds used in GridSearchCV.
* _polynomial_features_degress_: PolynomialFeatures as a preprocessing step.
* _train_accuracy_threshold_: Accuracy thresholds for train dataset. This values is used to determine if the model should be saved.
* _test_accuracy_threshold_: Accuracy thresholds for test dataset. This values is used to determine if the model should be saved.
* _save_model_: Boolean that determines if the model should be saved provided the accuracy thresholds are met.

TIP: For large datasets, Snowflake offers [Snowpark-optimized Warehouses](https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized.html).



In [4]:
def train_revenue_prediction_model(
    session: Session, 
    features_table: str, 
    number_of_folds: int, 
    polynomial_features_degrees: int, 
    train_accuracy_threshold: float, 
    test_accuracy_threshold: float, 
    save_model: bool) -> Variant:
    
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import PolynomialFeatures
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split, GridSearchCV

    import os
    from joblib import dump

    # Load features
    df = session.table(features_table).to_pandas()

    # Preprocess the Numeric columns
    # We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
    # NOTE: High degrees can cause overfitting.
    numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = polynomial_features_degrees)),('scaler', StandardScaler())])

    # Combine the preprocessed step together using the Column Transformer module
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features)])

    # The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),('regressor', LinearRegression())])
    parameteres = {}

    X = df.drop('REVENUE', axis = 1)
    y = df['REVENUE']

    # Split dataset into training and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

    # Use GridSearch to find the best fitting model based on number_of_folds folds
    model = GridSearchCV(pipeline, param_grid=parameteres, cv=number_of_folds)

    model.fit(X_train, y_train)
    train_r2_score = model.score(X_train, y_train)
    test_r2_score = model.score(X_test, y_test)

    model_saved = False

    if save_model:
        if train_r2_score >= train_accuracy_threshold and test_r2_score >= test_accuracy_threshold:
            # Upload trained model to a stage
            model_output_dir = '/tmp'
            model_file = os.path.join(model_output_dir, 'model.joblib')
            dump(model, model_file)
            session.file.put(model_file,"@pyladies_models",overwrite=True)
            model_saved = True

    # Return model R2 score on train and test data
    return {"R2 score on Train": train_r2_score,
            "R2 threshold on Train": train_accuracy_threshold,
            "R2 score on Test": test_r2_score,
            "R2 threshold on Test": test_accuracy_threshold,
            "Model saved": model_saved}

#### Test Python function before deploying it as a Stored Procedure on Snowflake

Since we're in test mode, we will set _save_model = False_ so that the model is not saved just yet.

In [5]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = False

train_revenue_prediction_model(
    session,
    "MARKETING_BUDGETS_FEATURES",
    cross_validaton_folds,
    polynomial_features_degrees,
    train_accuracy_threshold,
    test_accuracy_threshold,
    save_model)

{'R2 score on Train': 0.9954552822793987,
 'R2 threshold on Train': 0.85,
 'R2 score on Test': 0.8817971097765377,
 'R2 threshold on Test': 0.85,
 'Model saved': False}

### Create Stored Procedure to deploy model training code on Snowflake

Assuming the testing is complete and we're satisfied with the model, let's **register the model training Python function as a Snowpark Python Stored Procedure** by supplying the packages (_snowflake-snowpark-python,scikit-learn, and joblib_) it will need and use during execution.

TIP: For more information on Snowpark Python Stored Procedures, refer to the [docs](https://docs.snowflake.com/en/sql-reference/stored-procedures-python.html).

In [6]:
session.sproc.register(
    func=train_revenue_prediction_model,
    name="train_revenue_prediction_model",
    packages=['snowflake-snowpark-python','scikit-learn','joblib'],
    is_permanent=True,
    stage_location="@pyladies_sprocs",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x168190a90>

### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

### Execute Stored Procedure to train model and deploy it on Snowflake

Now we're ready to train the model and save it onto a Snowflake stage so let's set _save_model = True_ and run/execute the Stored Procedure using _session.call()_ function.

In [7]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = True

print(session.call('train_revenue_prediction_model',
                    'MARKETING_BUDGETS_FEATURES',
                    cross_validaton_folds,
                    polynomial_features_degrees,
                    train_accuracy_threshold,
                    test_accuracy_threshold,
                    save_model))

{
  "Model saved": true,
  "R2 score on Test": 0.8817971097765288,
  "R2 score on Train": 0.9954552822793986,
  "R2 threshold on Test": 0.85,
  "R2 threshold on Train": 0.85
}


### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

### Create Scalar User-Defined Function (UDF) for inference

Now to deploy this model for inference, let's **create and register a Snowpark Python UDF and add the trained model as a dependency**. Once registered, getting new predictions is as simple as calling the function by passing in data.

*NOTE: Scalar UDFs operate on a single row / set of data points and are great for online inference in real-time. And this UDF is called from the Streamlit App. See [Snowpark_Streamlit_Revenue_Prediction.py](Snowpark_Streamlit_Revenue_Prediction.py)*

TIP: For more information on Snowpark Python User-Defined Functions, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-udfs.html).

In [8]:
session.clear_imports()
session.clear_packages()

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@pyladies_models/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn==1.1.1')

@udf(name='predict_roi',session=session,replace=True,is_permanent=True,stage_location='@pyladies_udfs')
def predict_roi(budget_allocations: list) -> float:
    import sys
    import pandas as pd
    from joblib import load
    import sklearn
    # Snowflake copies the staged file to the UDF’s import directory. 
    # You can retrieve the directory’s location using the snowflake_import_directory system option.
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + 'model.joblib.gz'
    model = load(model_file)
            
    features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    df = pd.DataFrame([budget_allocations], columns=features)
    roi = abs(model.predict(df)[0])
    return roi

### Call Scalar User-Defined Function (UDF) for inference on new data

 Once the UDF is registered, getting new predictions is as simple as calling the _call_udf()_ Snowpark Python function and passing in new datapoints.

Let's create a SnowPark DataFrame with some sample data and call the UDF to get new predictions.

 *NOTE: This UDF is also called from the Streamlit App. See [Snowpark_Streamlit_Revenue_Prediction.py](Snowpark_Streamlit_Revenue_Prediction.py)*

In [9]:
test_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]], 
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("predict_roi", 
    array_construct(col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL"))).as_("PREDICTED_ROI")).show()

-----------------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
-----------------------------------------------------------------------------
|500000           |500000          |500000   |500000   |3179613.166194174   |
|250000           |250000          |200000   |450000   |4072491.441724832   |
|8500             |9500            |2000     |500      |189866.83304576762  |
-----------------------------------------------------------------------------



### Create Vectorized User-Defined Function (UDF) using Batch API for inference

Here we will leverage the Python UDF Batch API to create a **vectorized** UDF which takes a Pandas Dataframe as input. This means that each call to the UDF receives a set/batch of rows compared to a Scalar UDF which gets one row as input. 

First we will create a helper function _load_model()_ that uses **cachetools** to make sure we only load the model once followed by _batch_predict_roi()_ function that does the inference. 

_NOTE: Vectorized UDFs are great for offline inference in batch mode._

Advantages of using the Batch API over Scalar UDFs:

* The potential for better performance if your Python code operates efficiently on batches of rows
* Less transformation logic required if you are calling into libraries that operate on Pandas DataFrames or Pandas arrays

TIP: For more information on Snowpark Python UDF Batch API, refer to the [docs](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-batch.html#getting-started-with-the-batch-api).

In [10]:
session.clear_imports()
session.clear_packages()

import cachetools
from snowflake.snowpark.types import PandasSeries, PandasDataFrame

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@pyladies_models/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn','cachetools')

@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

@udf(name='batch_predict_roi',session=session,replace=True,is_permanent=True,stage_location='@pyladies_udfs')
def batch_predict_roi(budget_allocations_df: PandasDataFrame[int, int, int, int]) -> PandasSeries[float]:
    import sklearn
    budget_allocations_df.columns = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    model = load_model('model.joblib.gz')
    return abs(model.predict(budget_allocations_df))

### Call Vectorized User-Defined Function (UDF) using Batch API for inference on new data

When you use the Batch API:

* You do not need to change how you write queries using Python UDFs. All batching is handled by the UDF framework rather than your own code
* NOTE: As with the non-batch / scalar API, there is no guarantee of which instances of your handler code will see which batches of input

In [11]:
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("batch_predict_roi", 
    col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL")).as_("PREDICTED_ROI")).show()

-----------------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
-----------------------------------------------------------------------------
|500000           |500000          |500000   |500000   |3179613.166194174   |
|250000           |250000          |200000   |450000   |4072491.441724832   |
|8500             |9500            |2000     |500      |189866.83304576762  |
-----------------------------------------------------------------------------



In [12]:
session.close()