Import the necessary Python modules.


In [2]:
from datetime import date, timedelta
import pandas as pd
import xgboost as xgb
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

Let's establish a Snowpark session using hextoolkit.

In [3]:
session = Session.builder.configs(SnowflakeLoginOptions()).getOrCreate()

SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. 


In [4]:
session.use_database('store_traffic')
session.use_schema('TIME_SERIES')
print(f"Role: {session.get_current_role()} | WH: {session.get_current_warehouse()} | DB.SCHEMA: {session.get_fully_qualified_current_schema()}")

Role: "SYSADMIN" | WH: "COMPUTE_WH" | DB.SCHEMA: "STORE_TRAFFIC"."TIME_SERIES"


We will start with an existing table that we will want for our Time Series features. The table contains a `DATE` column which is the day the traffic was recorded. We will extract additional date features by joining it to a calendar table we've defined. 



In [5]:
store_traffic_info_df = session.table("TRAFFIC")
store_calendar_info_df = session.table("CALENDAR_INFO_2018")

Let's preview these tables. 

In [6]:
print(f'Number of Unique STORE_IDs: {store_traffic_info_df.select("STORE_ID").distinct().count()}')
store_traffic_info_df.show()

Number of Unique STORE_IDs: 1000
---------------------------------------
|"DATE"      |"STORE_ID"  |"TRAFFIC"  |
---------------------------------------
|2018-01-01  |1           |86         |
|2018-01-02  |1           |96         |
|2018-01-03  |1           |99         |
|2018-01-04  |1           |88         |
|2018-01-05  |1           |73         |
|2018-01-06  |1           |70         |
|2018-01-07  |1           |74         |
|2018-01-08  |1           |83         |
|2018-01-09  |1           |96         |
|2018-01-10  |1           |98         |
---------------------------------------



In [7]:
store_calendar_info_df.show()

--------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |
--------------------------------------------------------------------------------------------------------
|2018-01-01       |0               |1              |1               |2018             |New Year's Day  |
|2018-01-02       |1               |2              |1               |2018             |NULL            |
|2018-01-03       |2               |3              |1               |2018             |NULL            |
|2018-01-04       |3               |4              |1               |2018             |NULL            |
|2018-01-05       |4               |5              |1               |2018             |NULL            |
|2018-01-06       |5               |6              |1               |2018             |NULL            |
|2018-01-07       |6               |7              |1  

## Feature Engineering

Join the Calendar info table to the traffic table 

In [8]:
past_final = (
    store_traffic_info_df.join(
        store_calendar_info_df,
        (
            store_calendar_info_df.col("CALENDAR_DATE")
            == store_traffic_info_df.col("DATE")
        ),
        "left",
    )
    .select(
        F.col("DATE"),
        "STORE_ID",
        "WEEK_DAY_NBR",
        "MTH_DAY_NBR",
        "CALENDAR_MTH",
        "CALENDAR_YEAR",
        "HOLIDAY_NAME",
        "TRAFFIC",
    )
    .na.fill({"HOLIDAY_NAME": "No Holiday"})
)
past_final.show()

----------------------------------------------------------------------------------------------------------------------------
|"DATE"      |"STORE_ID"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |"TRAFFIC"  |
----------------------------------------------------------------------------------------------------------------------------
|2018-01-01  |1           |0               |1              |1               |2018             |New Year's Day  |86         |
|2018-01-02  |1           |1               |2              |1               |2018             |No Holiday      |96         |
|2018-01-03  |1           |2               |3              |1               |2018             |No Holiday      |99         |
|2018-01-04  |1           |3               |4              |1               |2018             |No Holiday      |88         |
|2018-01-05  |1           |4               |5              |1               |2018             |No Holiday      |73         |


Since we will be forecasting out 4 weeks we need 4 weeks of blank calendar data

In [9]:
future_cal = (
    session.table("store_traffic.time_series.CALENDAR_INFO_2018")
    .select(
        "CALENDAR_DATE",
        "WEEK_DAY_NBR",
        "MTH_DAY_NBR",
        "CALENDAR_MTH",
        "CALENDAR_YEAR",
        "HOLIDAY_NAME",
    )
    .filter(
        (F.col("CALENDAR_DATE") >= F.current_date())
        & (F.col("CALENDAR_DATE") <= F.current_date() + 28)
    )
)

future_cal.show()

--------------------------------------------------------------------------------------------------------
|"CALENDAR_DATE"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |
--------------------------------------------------------------------------------------------------------
|2024-02-27       |1               |27             |2               |2024             |NULL            |
|2024-02-28       |2               |28             |2               |2024             |NULL            |
|2024-02-29       |3               |29             |2               |2024             |NULL            |
|2024-03-01       |4               |1              |3               |2024             |NULL            |
|2024-03-02       |5               |2              |3               |2024             |NULL            |
|2024-03-03       |6               |3              |3               |2024             |NULL            |
|2024-03-04       |0               |4              |3  

In [10]:
df_date = session.range(32).select(F.dateadd("DAY", "ID", F.current_date()).as_("DATE"))

df_date = df_date.select(F.to_date(df_date["DATE"]).as_("DATE"))

## Cross join to make sure each store gets a value for the next 4 weeks
df_store = (
    session.table("store_traffic.time_series.traffic")
    .select(F.col("STORE_ID").cast("string").alias("STORE_ID"))
    .distinct()
)
stores = df_date.cross_join(df_store)

In [11]:
future_cal = future_cal.na.fill({"HOLIDAY_NAME": "No Holiday"})

## Join store info and calendar data
future_df = stores.join(
    future_cal, stores.col("DATE") == future_cal.col("CALENDAR_DATE"), "right"
)
future_df = future_df.drop("CALENDAR_DATE")
future_df.show()

----------------------------------------------------------------------------------------------------------------
|"DATE"      |"STORE_ID"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |
----------------------------------------------------------------------------------------------------------------
|2024-02-27  |1           |1               |27             |2               |2024             |No Holiday      |
|2024-02-28  |1           |2               |28             |2               |2024             |No Holiday      |
|2024-02-29  |1           |3               |29             |2               |2024             |No Holiday      |
|2024-03-01  |1           |4               |1              |3               |2024             |No Holiday      |
|2024-03-02  |1           |5               |2              |3               |2024             |No Holiday      |
|2024-03-03  |1           |6               |3              |3               |2024             |N

Add a Traffic column pre-filled with zero that will be forecasted once our UDTF executes.

In [12]:
future_df = future_df.with_column("TRAFFIC", F.lit(0))

future_df = future_df.select(
    "DATE",
    "STORE_ID",
    "WEEK_DAY_NBR",
    "MTH_DAY_NBR",
    "CALENDAR_MTH",
    "CALENDAR_YEAR",
    "HOLIDAY_NAME",
    "TRAFFIC",
)
future_df.show()

----------------------------------------------------------------------------------------------------------------------------
|"DATE"      |"STORE_ID"  |"WEEK_DAY_NBR"  |"MTH_DAY_NBR"  |"CALENDAR_MTH"  |"CALENDAR_YEAR"  |"HOLIDAY_NAME"  |"TRAFFIC"  |
----------------------------------------------------------------------------------------------------------------------------
|2024-02-27  |1           |1               |27             |2               |2024             |No Holiday      |0          |
|2024-02-28  |1           |2               |28             |2               |2024             |No Holiday      |0          |
|2024-02-29  |1           |3               |29             |2               |2024             |No Holiday      |0          |
|2024-03-01  |1           |4               |1              |3               |2024             |No Holiday      |0          |
|2024-03-02  |1           |5               |2              |3               |2024             |No Holiday      |0          |


Union the historical and future tables together and write back to a Snowflake table.

In [13]:
unionDF = past_final.union(future_df)

unionDF.write.saveAsTable('FEATURES_TRAFFIC', mode='overwrite', create_temp_table=False)

## Creating the User Defined Table Function for multi-node parallelized model training

Create the stage, output schema and UDTF for training and forecasting.

In [14]:
# Add stage for UDFs and Stored Procs
session.sql(
    """
create stage if not exists pymodels
"""
).collect()

[Row(status='PYMODELS already exists, statement succeeded.')]

In [15]:
schema = T.StructType([
    T.StructField("DATE1", T.DateType()),
    T.StructField("TRAFFIC_FORECAST", T.IntegerType())  
])

@F.udtf(output_schema = schema,
     input_types = [T.DateType(), T.IntegerType(), T.StringType(), T.StringType(), T.StringType(), T.StringType()],
     name = "store_traffic_forecast", is_permanent=True, stage_location="@pymodels", session=session,
     packages=["pandas","xgboost"], replace=True)

class forecast:
    def __init__(self):
        self.DATE=[]
        self.DAYOFWEEK=[]
        self.MONTH=[]
        self.YEAR=[]
        self.HOLIDAY_NAME=[]
        self.TRAFFIC=[]
    
    def process(self, DATE, TRAFFIC, DAYOFWEEK, MONTH, YEAR, HOLIDAY_NAME):
        self.DATE.append(DATE)
        self.TRAFFIC.append(TRAFFIC)
        self.DAYOFWEEK.append(DAYOFWEEK)
        self.MONTH.append(MONTH)
        self.YEAR.append(YEAR)
        self.HOLIDAY_NAME.append(HOLIDAY_NAME)
    
    def end_partition(self):
        df = pd.DataFrame(zip(self.DATE, 
                              self.TRAFFIC, 
                              self.DAYOFWEEK, 
                              self.MONTH, 
                              self.YEAR, 
                              self.HOLIDAY_NAME), 
                          columns = ['DATE','TRAFFIC','WEEK_DAY_NBR',
                                     'CALENDAR_MTH','CALENDAR_YEAR','HOLIDAY_NAME'])
        
        # set the time column as our index 
        df2 = df.set_index('DATE') 
        df2.index = pd.to_datetime(df2.index)

         # Converting features to categories for get_dummies
        df2['WEEK_DAY_NBR'] = df2['WEEK_DAY_NBR'].astype("category")
        df2['CALENDAR_MTH'] = df2['CALENDAR_MTH'].astype("category")
        df2['CALENDAR_YEAR'] = df2['CALENDAR_YEAR'].astype("category")
        df2['HOLIDAY_NAME'] = df2['HOLIDAY_NAME'].astype("category")

        #Use get_dummies for categorical features
        final = pd.get_dummies(data=df2, columns=['HOLIDAY_NAME', 
                                                  'WEEK_DAY_NBR','CALENDAR_MTH','CALENDAR_YEAR'])
       
        #do the train & forecast split
        today = date.today()
        last_14 = today - timedelta(days=14)
        fourweek = today + timedelta(days = 28)

        train = final[(final.index >= pd.to_datetime('01-Jan-2018')) & (final.index < pd.to_datetime(last_14))]
        forecast = final[(final.index >= pd.to_datetime(last_14)) & (final.index <=pd.to_datetime(fourweek))]

        X_train = train.drop('TRAFFIC', axis = 1)
        y_train = train['TRAFFIC']

        X_forecast = forecast.drop('TRAFFIC', axis = 1)
        
        #Use XGBoost regressor model
        model = xgb.XGBRegressor(n_estimators=200,n_jobs=1)
        model.fit(X_train, y_train,
                verbose=False) 
        
        forecast['PREDICTION'] = model.predict(X_forecast)

        forecast['DATE'] = forecast.index
        forecast = forecast[["DATE","PREDICTION"]]
        forecast = forecast.sort_index()
        forecast.loc[forecast['PREDICTION'] < 0,'PREDICTION']=0
        
        # output prediction
        for idx, row in forecast.iterrows():
            DATE = row['DATE']
            PREDICTION = row['PREDICTION']
            yield DATE, PREDICTION

Create the query to call to the UDTF and partition by Store ID

In [16]:
store_forecast_test = F.table_function(
    "store_traffic.time_series.store_traffic_forecast"
)

df = session.table('FEATURES_TRAFFIC')

forecast = df.select(
    df["STORE_ID"],
    (
        store_forecast_test(
            df["DATE"],
            df["TRAFFIC"],
            df["WEEK_DAY_NBR"],
            df["CALENDAR_MTH"],
            df["CALENDAR_YEAR"],
            df["HOLIDAY_NAME"],
        ).over(partition_by=df["STORE_ID"])
    ),
)

Write the output to a table esentially calling the UDTF, training and forecasting for the next 4 weeks.  This cell will actually execute the UDTF.

In [17]:
forecast.with_column_renamed("DATE1", "DATE").write.save_as_table(
    "FORECAST", mode="overwrite"
)

Join forecast to the Actual and visualize the results

In [18]:
traffic = session.table("FEATURES_TRAFFIC")
forecast = session.table("FORECAST")

act_vs_pred = (
    traffic.join(
        forecast,
        (traffic.col("DATE") == forecast.col("DATE"))
        & (traffic.col("STORE_ID") == forecast.col("STORE_ID")),
    )
    .select(
        F.cast(traffic.col("STORE_ID"), T.IntegerType()).alias("STORE_ID"),
        traffic.col("DATE").alias("DATE"),
        F.cast(
            F.when(traffic.col("DATE") >= F.current_date(), F.lit(None)).otherwise(
                traffic.col("TRAFFIC")
            ),
            T.IntegerType(),
        ).alias("ACTUAL"),
        forecast.col("TRAFFIC_FORECAST").alias("FORECAST"),
    )
    .filter(traffic.col("DATE") > F.current_date() - 15)
)

In [19]:
act_vs_pred.show(3)

---------------------------------------------------
|"STORE_ID"  |"DATE"      |"ACTUAL"  |"FORECAST"  |
---------------------------------------------------
|1           |2024-02-16  |123       |120         |
|1           |2024-02-24  |109       |111         |
|1           |2024-02-25  |116       |115         |
---------------------------------------------------



In [20]:
act_vs_pred.write.saveAsTable('actual_vs_forecast', mode='overwrite', create_temp_table=False)
act_v_pred = session.table('actual_vs_forecast')

____

# Vectorized UDTF

In [21]:
# Function that goes inside the UDTF

def forecast_function(df:pd.DataFrame) -> pd.DataFrame:
    # set the time column as our index 
    df2 = df.set_index('DATE') 
    df2.index = pd.to_datetime(df2.index)

    # Converting features to categories for get_dummies
    df2['WEEK_DAY_NBR'] = df2['WEEK_DAY_NBR'].astype("category")
    df2['CALENDAR_MTH'] = df2['CALENDAR_MTH'].astype("category")
    df2['CALENDAR_YEAR'] = df2['CALENDAR_YEAR'].astype("category")
    df2['HOLIDAY_NAME'] = df2['HOLIDAY_NAME'].astype("category")

    #Use get_dummies for categorical features
    final = pd.get_dummies(data=df2, columns=['HOLIDAY_NAME', 
                                                'WEEK_DAY_NBR','CALENDAR_MTH','CALENDAR_YEAR'])
    
    #do the train & forecast split
    today = date.today()
    last_14 = today - timedelta(days=14)
    fourweek = today + timedelta(days = 28)

    train = final[(final.index >= pd.to_datetime('01-Jan-2018')) & (final.index < pd.to_datetime(last_14))]
    forecast = final[(final.index >= pd.to_datetime(last_14)) & (final.index <=pd.to_datetime(fourweek))]

    X_train = train.drop('TRAFFIC', axis = 1)
    y_train = train['TRAFFIC']

    X_forecast = forecast.drop('TRAFFIC', axis = 1)
    
    #Use XGBoost regressor model
    model = xgb.XGBRegressor(n_estimators=200,n_jobs=1)
    model.fit(X_train, y_train,
            verbose=False) 
    
    forecast['PREDICTION'] = model.predict(X_forecast)

    forecast['DATE'] = forecast.index
    forecast = forecast[["DATE","PREDICTION"]]
    forecast = forecast.sort_index()
    forecast.loc[forecast['PREDICTION'] < 0,'PREDICTION']=0

    return forecast

In [22]:
input_df = session.table('FEATURES_TRAFFIC').select(["STORE_ID", "DATE", "TRAFFIC", "WEEK_DAY_NBR", "CALENDAR_MTH", "CALENDAR_YEAR", "HOLIDAY_NAME"])
# input_df = session.table('FEATURES_TRAFFIC').drop("MTH_DAY_NBR")

# Obtain input types and output schema
input_col_names = input_df.columns
input_dtypes = [field.datatype for field in input_df.schema.fields]
vect_udtf_input_dtypes = [T.PandasDataFrameType(input_dtypes)]

vect_udtf_output_schema = T.PandasDataFrameType(
    [T.DateType(), T.IntegerType()], ["DATE1", "TRAFFIC_FORECAST"]
)

@F.udtf(output_schema = vect_udtf_output_schema,
     input_types = vect_udtf_input_dtypes,
     name = "store_traffic_vect_udtf", is_permanent=True, stage_location="@pymodels", session=session,
     packages=["pandas","xgboost"], replace=True)
class anom_detection:
    def end_partition(self, df):
        
        df.columns = input_col_names # NOTE: In Vectorized udtf you have to put the column names back into the df

        forecast = forecast_function(df)

        yield forecast

In [23]:
# Call the UDTF
store_forecast_test = F.table_function(
    "store_traffic.time_series.store_traffic_vect_udtf"
)

forecast_vect_udtf = input_df.select(
    "STORE_ID",
    store_forecast_test(
            *[F.col(col_nm) for col_nm in input_col_names]
        ).over(partition_by=["STORE_ID"], order_by=["DATE"])
)

# Write to table
forecast_vect_udtf.with_column_renamed("DATE1", "DATE").write.save_as_table(
    "FORECAST_VECT_UDTF", mode="overwrite"
)

In [24]:
session.table("FORECAST_VECT_UDTF").show(3)

------------------------------------------------
|"STORE_ID"  |"DATE"      |"TRAFFIC_FORECAST"  |
------------------------------------------------
|659         |2024-02-13  |126                 |
|659         |2024-02-14  |129                 |
|659         |2024-02-15  |119                 |
------------------------------------------------



Compare the code for the scalar vs vectorized UDTF:

In [25]:
# # THIS CELL LETS YOU SEE THE DIFFERENCE BETWEEN DEFINING/REGISTERING SCALAR & VECTORIZED UDTFS


# # **************************************************************
# # SCALAR UDTF
# # **************************************************************

# schema = T.StructType([
#     T.StructField("DATE1", T.DateType()),
#     T.StructField("TRAFFIC_FORECAST", T.IntegerType())  
# ])

# @F.udtf(output_schema = schema,
#      input_types = [T.DateType(), T.IntegerType(), T.StringType(), T.StringType(), T.StringType(), T.StringType()],
#      name = "store_traffic_forecast", is_permanent=True, stage_location="@pymodels", session=session,
#      packages=["pandas","xgboost"], replace=True)

# class forecast:
#     def __init__(self):
#         self.DATE=[]
#         self.DAYOFWEEK=[]
#         self.MONTH=[]
#         self.YEAR=[]
#         self.HOLIDAY_NAME=[]
#         self.TRAFFIC=[]
    
#     def process(self, DATE, TRAFFIC, DAYOFWEEK, MONTH, YEAR, HOLIDAY_NAME):
#         self.DATE.append(DATE)
#         self.TRAFFIC.append(TRAFFIC)
#         self.DAYOFWEEK.append(DAYOFWEEK)
#         self.MONTH.append(MONTH)
#         self.YEAR.append(YEAR)
#         self.HOLIDAY_NAME.append(HOLIDAY_NAME)
    
#     def end_partition(self):
#         df = pd.DataFrame(zip(self.DATE, 
#                               self.TRAFFIC, 
#                               self.DAYOFWEEK, 
#                               self.MONTH, 
#                               self.YEAR, 
#                               self.HOLIDAY_NAME), 
#                           columns = ['DATE','TRAFFIC','WEEK_DAY_NBR',
#                                      'CALENDAR_MTH','CALENDAR_YEAR','HOLIDAY_NAME'])
        
#         forecast = forecast_function(df)
        
#         # output prediction
#         for idx, row in forecast.iterrows():
#             DATE = row['DATE']
#             PREDICTION = row['PREDICTION']
#             yield DATE, PREDICTION


# # **************************************************************
# # VECTORIZED UDTF
# # **************************************************************
# input_df = session.table('FEATURES_TRAFFIC').drop("MTH_DAY_NBR")

# # Obtain input types and output schema
# input_col_names = input_df.columns
# input_dtypes = [field.datatype for field in input_df.schema.fields]
# vect_udtf_input_dtypes = [T.PandasDataFrameType(input_dtypes)]

# vect_udtf_output_schema = T.PandasDataFrameType(
#     [T.DateType(), T.IntegerType()], ["DATE1", "TRAFFIC_FORECAST"]
# )

# @F.udtf(output_schema = vect_udtf_output_schema,
#      input_types = vect_udtf_input_dtypes,
#      name = "store_traffic_vect_udtf", is_permanent=True, stage_location="@pymodels", session=session,
#      packages=["pandas","xgboost"], replace=True)
# class anom_detection:
#     def end_partition(self, df):
       
#         df.columns = input_col_names  # NOTE: In Vectorized udtf you have to put the column names back into the df

#         forecast = forecast_function(df)

#         yield forecast