In [24]:
import json
import numpy as np
import pandas as pd
import os
import sys

from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import sproc, col
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType, Variant
from snowflake.snowpark.exceptions import SnowparkSQLException

# Log In, Create Session

In [2]:
# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open("/Users/mitaylor/Documents/creds/creds.json").read())

# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()

# Create a fresh & new schema
session.sql("USE DATABASE MT_TEST").collect()
session.sql("CREATE OR REPLACE STAGE FUNCTIONS").collect()
session.sql("CREATE OR REPLACE WAREHOUSE ASYNC_WH WITH WAREHOUSE_SIZE='MEDIUM' WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'").collect()

[Row(status='Warehouse ASYNC_WH successfully created.')]

# Get the Data From the Share, Examine it, and Save it

In [3]:
sdf = session.sql("select * FROM DATA_LAKE_TRADE_DATA_MT.PUBLIC.TRADE")
sdf.write.save_as_table("TEST", mode="overwrite")
sdf.limit(5).to_pandas()

Unnamed: 0,DATE,SYMBOL,EXCHANGE,ACTION,CLOSE,NUM_SHARES,CASH,TRADER,PM
0,1999-01-29,GBR,NYSE,buy,28.75,92.0,-2645.0,Tiffany Frey,Mrs Claire George
1,1999-02-01,GBR,NYSE,buy,29.375,114.0,-3348.75,Tiffany Bailey,Jenna Allen
2,1999-02-01,GBR,NYSE,buy,29.375,90.0,-2643.75,Tiffany Frey,Mrs Claire George
3,1999-02-03,GBR,NYSE,buy,27.5,122.0,-3355.0,Tiffany Bailey,Jenna Allen
4,1999-02-03,GBR,NYSE,buy,27.5,96.0,-2640.0,Tiffany Frey,Mrs Claire George


# Prepare the Data for Machine Learning (using a UDTF)

In [4]:
sdf_filtered = sdf.filter((col("SYMBOL") == 'TGVC') | (col("SYMBOL") == 'GOOG') | (col("SYMBOL") == 'OTRK'))
sdf_filtered.limit(5).to_pandas()

Unnamed: 0,DATE,SYMBOL,EXCHANGE,ACTION,CLOSE,NUM_SHARES,CASH,TRADER,PM
0,2019-08-05,GOOG,NASDAQ,hold,57.62,0.0,0.0,charles,warren
1,2019-08-06,GOOG,NASDAQ,hold,58.5,0.0,0.0,charles,warren
2,2019-08-07,GOOG,NASDAQ,hold,58.7,0.0,0.0,charles,warren
3,2019-08-08,GOOG,NASDAQ,hold,60.24,0.0,0.0,charles,warren
4,2019-08-09,GOOG,NASDAQ,hold,59.4,0.0,0.0,charles,warren


In [5]:
from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType, DateType

class ML_Prep:
    """
    UDTF class to create offset time series data for binary classification

    Yields
    -------
    df_new : DataFrame
        DataFrame with the TM3,TM2,TM1,TM0 data plus the y variable 

    """
    def __init__(self):
        None
        
    def end_partition(self, df):
        df.columns = ['DATE', 'SYMBOL', 'CLOSE']
        dates = df['DATE']
        symbol = df['SYMBOL']
        df = df['CLOSE']

        def series_to_supervised(df, n_in=3, n_out=1, dropnan=True):
            cols = list()
            # input sequence (t-n, ... t-1)
            for i in range(n_in, 0, -1):
                cols.append(df.shift(i))
            for i in range(0, n_out):
                cols.append(df.shift(-i))
            agg = pd.concat(cols, axis=1)
            
            # drop rows with NaN values
            df = pd.DataFrame(agg.values)
            df = df.fillna(df.mean())
            df.columns = ['TM3', 'TM2', 'TM1', 'TM0']
            df['y'] = df['TM0'] - df['TM1']
            df['y'] = [1 if i>0 else 0 for i in list(df['y'])]
            return df
        df_new = series_to_supervised(df)
        df_new['DATE'] = dates
        df_new['SYMBOL'] = symbol
        yield df_new

ML_Prep.end_partition._sf_vectorized_input = pd.DataFrame

ml_prep_udtf = session.udtf.register(
    ML_Prep, # the class
    input_types=[PandasDataFrameType([DateType()] + # DATE
                                     [StringType()] + # SYMBOL
                                     [FloatType()] # CLOSE
                                    )], 
    output_schema=PandasDataFrameType([FloatType(),FloatType(),FloatType(),FloatType(),IntegerType(),DateType(),StringType()],
                                      ["TM3", "TM2", "TM1", "TM0", "Y", "DATE_", "SYMBOL_"]),
    packages=["snowflake-snowpark-python", 'pandas'])  



In [6]:
all_cols = ['DATE', 'SYMBOL', 'CLOSE']
sdf_prepped = sdf_filtered.select(ml_prep_udtf(*all_cols).over(partition_by=['SYMBOL']))
sdf_prepped.limit(5).to_pandas()

Unnamed: 0,TM3,TM2,TM1,TM0,Y,DATE_,SYMBOL_
0,0.521263,0.521232,0.521197,0.48,0,2022-10-05,OTRK
1,0.521263,0.521232,0.48,0.57,1,2022-09-21,OTRK
2,0.521263,0.48,0.57,0.55,0,2022-09-22,OTRK
3,0.48,0.57,0.55,0.49,0,2022-09-23,OTRK
4,0.57,0.55,0.49,0.49,0,2022-09-26,OTRK


# Create Train and Test Set

In [8]:
sdf_goog = sdf_prepped.filter((col("SYMBOL_") == 'GOOG'))
weights = [0.5, 0.5]
sdf_goog_train, sdf_goog_test = sdf_goog.random_split(weights)
sdf_goog_train.write.save_as_table("GOOG_TRAIN", mode="overwrite")
sdf_goog_test.write.save_as_table("GOOG_TEST", mode="overwrite")

# Create and Train an ML Model to Predict Price Direction

In [9]:
# Snowpark ML
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.model_selection import GridSearchCV

# Define the XGBRegressor
regressor = XGBClassifier(
    input_cols=['TM3', 'TM2','TM1'],
    label_cols=['Y'],
    output_cols=['Y_PRED']
)

# Train
regressor.fit(sdf_goog_train)

# Predict
result = regressor.predict(sdf_goog_test)

In [10]:
result.limit(5).to_pandas()

Unnamed: 0,Y,TM1,DATE_,SYMBOL_,TM0,TM2,TM3,Y_PRED
0,0,73.47,2020-09-18,GOOG,73.0,171.954091,171.954557,1
1,0,71.56,2020-09-21,GOOG,71.56,73.0,73.47,1
2,1,71.56,2020-09-22,GOOG,73.27,71.56,73.0,1
3,0,73.27,2020-09-22,GOOG,73.27,71.56,71.56,1
4,0,70.76,2020-09-23,GOOG,70.76,73.27,73.27,1


# Register ML Model (in the Registry)

In [11]:
from snowflake.ml.registry import registry
from snowflake.ml._internal.utils import identifier
session.sql("CREATE OR REPLACE DATABASE MODEL_REGISTRY").collect()
session.sql("CREATE OR REPLACE SCHEMA PUBLIC").collect()
REGISTRY_DATABASE_NAME = "MODEL_REGISTRY"
REGISTRY_SCHEMA_NAME = "PUBLIC"
native_registry = registry.Registry(session=session, database_name=REGISTRY_DATABASE_NAME, schema_name=REGISTRY_SCHEMA_NAME)

In [12]:
MODEL_NAME = "REGRESSOR_XGB"
MODEL_VERSION = "v1"

model = native_registry.log_model(
    model_name=MODEL_NAME,
    version_name=MODEL_VERSION,
    model=regressor,
)

# Run ML Model

In [13]:
model.run(sdf_goog_test, function_name="predict").limit(20).to_pandas()

Unnamed: 0,TM0,Y,DATE_,SYMBOL_,TM3,TM2,TM1,Y_PRED
0,73.0,0,2020-09-18,GOOG,171.954557,171.954091,73.47,1
1,71.56,0,2020-09-21,GOOG,73.47,73.0,71.56,1
2,73.27,1,2020-09-22,GOOG,73.0,71.56,71.56,1
3,73.27,0,2020-09-22,GOOG,71.56,71.56,73.27,1
4,70.76,0,2020-09-23,GOOG,73.27,73.27,70.76,1
5,71.41,1,2020-09-24,GOOG,73.27,70.76,70.76,1
6,71.41,0,2020-09-24,GOOG,70.76,70.76,71.41,0
7,73.23,1,2020-09-28,GOOG,71.41,72.25,72.25,0
8,73.23,0,2020-09-28,GOOG,72.25,72.25,73.23,1
9,73.48,1,2020-09-30,GOOG,73.23,73.47,73.0,1


# Examine via Evidently (ideally with a task)

Basic, just look at HTML
advanced ish, write to a table, look at it in Snowsight.

Note this has slightly quirky Python version requirements so you will want to set this up in it's own venv as it requires Python 3.8 and SnowparkML requires 3.9

In [15]:
session.sql("USE DATABASE MT_TEST").collect()

@sproc(session=session, name='evidently_monitor', stage_location='@FUNCTIONS',  
       packages=['snowflake-snowpark-python', 'pandas', 'evidently'], 
       is_permanent=True, 
       replace=True)
def monitor_model(session: Session, history: str, new_data: str) -> str:
    """
    Creates a report that monitors the model drift etc. using evidently package

    Parameters
    ----------
    history : string
        The initial training table

    new_data : string
        The new date (test in this case)

    Returns
    -------
    completion_confirmation : string
        Simple confirmation of completion of task (note completion is when the table is prepared)

    """
    
    from evidently.report import Report
    from evidently.metrics.base_metric import generate_column_metrics
    from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
    from evidently.test_suite import TestSuite
    from evidently.tests.base_test import generate_column_tests
    from evidently.test_preset import DataStabilityTestPreset, NoTargetPerformanceTestPreset
    from evidently.metrics import ColumnSummaryMetric, ColumnQuantileMetric, ColumnDriftMetric
    from joblib import dump

    report = Report(metrics=[
        ColumnSummaryMetric(column_name='TM1'),
        ColumnQuantileMetric(column_name='TM1', quantile=0.25),
        ColumnDriftMetric(column_name='TM1')
    ])

    reference = session.table(history).to_pandas()
    current = session.table(new_data).to_pandas()
    report.run(reference_data=reference, current_data=current)
    
    report.save_html("/tmp/report.html")
    session.file.put("/tmp/report.html", '@FUNCTIONS', auto_compress=False, overwrite=True)

    return('Data Prepped')

monitor_model("GOOG_TRAIN", "GOOG_TEST")

'Data Prepped'