# CitiBike ML Demo
By: David Stearns  

![ alt text for screen readers](sf_ds.png "Text to show on mouseover")

![ alt text for screen readers](sf_ds2.png "Text to show on mouseover")

![ alt text for screen readers](sf_ds3.png "Text to show on mouseover")

![ alt text for screen readers](sf_ds4.png "Text to show on mouseover")

![ alt text for screen readers](sf_ds5.png "Text to show on mouseover")

![ alt text for screen readers](sf_ds6.png "Text to show on mouseover")

In [None]:
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.functions import udf
from snowflake.snowpark.session import Session
from snowflake.snowpark import types as T
import json
import pandas as pd
import numpy as np
import datetime
import sys
import math
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Window
pd.set_option('display.max_columns', None)
import os
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_validate
from sklearn.ensemble import RandomForestRegressor
import credentials
import io
import joblib
import cachetools
#
from multiprocessing import pool
#
conn = {
    "account": credentials.credentials["account"],
    "user": credentials.credentials["username"],
    "password": credentials.credentials["password"],
    "role": credentials.credentials["role"],
    "warehouse": credentials.credentials["warehouse"],
    "database": credentials.credentials["database"],
    "schema": credentials.credentials["schema"]
}
session = Session.builder.configs(conn).create()
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.functions import udf
from snowflake.snowpark.session import Session
from snowflake.snowpark import types as T
import json
import pandas as pd
import numpy as np
import datetime
import sys
import math
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Window
pd.set_option('display.max_columns', None)
import os
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_validate
from sklearn.ensemble import RandomForestRegressor
import credentials
import io
import joblib
import cachetools
import time
#
from multiprocessing import pool
#
conn = {
    "account": credentials.credentials["account"],
    "user": credentials.credentials["username"],
    "password": credentials.credentials["password"],
    "role": credentials.credentials["role"],
    "warehouse": credentials.credentials["warehouse"],
    "database": credentials.credentials["database"],
    "schema": credentials.credentials["schema"]
}
session = Session.builder.configs(conn).create()
print("-------------------------------------------")
warehouse = session.sql("select current_warehouse()").collect()[0][0]
print(f"""Using {warehouse} - a high memory warehouse""")
print("-------------------------------------------")
print("Preparing the warehouse metadata:")
print("-------------------------------------------")
session.sql("show warehouses like 'high_mem_wh'").collect()[0]

In [None]:
session.sql("select current_warehouse()").collect()

In [None]:
session.sql("use warehouse snowpark").collect()

In [None]:
def save_model(session, model, path):
  input_stream = io.BytesIO()
  joblib.dump(model, input_stream)
  session._conn._cursor.upload_stream(input_stream, path)
  return "successfully created file: " + path

In [None]:
session.sql(f"create or replace stage citibike_ml").collect()

### Feature Engineering Steps

In [None]:
df_trips_hourly = session.table("trips_stations_vw")
print("Aggregating TRIPS data")
print("-------------------------------------------")
df_trips_daily = df_trips_hourly.select(F.to_date(F.col("STARTTIME")).as_("ds")).groupBy(F.col("ds")).count().select(F.col("ds"), F.col("COUNT").as_("y"))
df_weather = session.table("weather_vw")
print("Joining the aggregated TRIPS data with WEATHER data from the Snowflake Data Marketplace")
print("-------------------------------------------")
df_trips_weather = df_trips_daily.join(df_weather, df_trips_daily["DS"] == df_weather["OBSERVATION_DATE"])\
                    .select(F.col("DS"),F.col("Y"), F.col("TEMP_AVG_C"), F.col("TOT_PRECIP_IN"))
print("Feature engineering: creating an indicator column for rain (1 = rain, 0 = no rain)")
print("-------------------------------------------")
df_trips_weather = df_trips_weather.withColumn("rain_indicator", F.when(F.col("TOT_PRECIP_IN") > 0, F.lit(1) ).otherwise(F.lit(0)))
print("Sorting the table by date")
print("-------------------------------------------")
df_trips_weather = df_trips_weather.sort(F.col("DS"))
print("Writing back the table to the Snowflake database with selected columns")
print("-------------------------------------------")
df_trips_weather.select("DS", "Y", "RAIN_INDICATOR", "TEMP_AVG_C").write.mode("overwrite").save_as_table("citibike_ml_demo")

In [None]:
cb_ml = session.table("citibike_ml_demo")

In [None]:
print("Feature engineering: creating a one day shift and natural log transformation","\n",
        "of the TRIPS figure (target) and saving it back to the Snowflake database","\n",
        "to create a features table")
print("-------------------------------------------")
session.sql(""" select
                    RAIN_INDICATOR,
                    TEMP_AVG_C,
                    Y_log,
                    LAG(Y_log,1,0) OVER (order by DS) as Y_yesterday
                from (
                    select ds, rain_indicator, temp_avg_c, round(ln(Y), 2) as Y_log from citibike_ml_demo
                ) A""").write.mode("overwrite").save_as_table("citibike_ml_demo_features")

In [None]:
session.table("citibike_ml_demo_features").show()

In [None]:
cb_ml = session.table("citibike_ml_demo")
cb_ml.show(5)

#### Creating function to be registered as a SPROC

In [None]:
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')
def citibike_ml_demo(session: snowflake.snowpark.Session) -> str:

    import snowflake.snowpark as snp
    from snowflake.snowpark import functions as F
    from snowflake.snowpark import types as T

    model_name = "predict_trips"
    stage_name = "citibike_ml"
    df = session.table("citibike_ml_demo_features")
    pandas_df = df.toPandas()
    pandas_df = pandas_df[1:]

    X = pandas_df[["RAIN_INDICATOR", "TEMP_AVG_C", "Y_YESTERDAY"]]
    y = pandas_df[["Y_LOG"]]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33 )
    rf = RandomForestRegressor(n_estimators=100, random_state=42, min_samples_split=3)
    rf.fit(X_train, y_train)

    save_model(session, rf, f"@{stage_name}/{model_name}.joblib")

    model_name_full = session.sql(f"ls @{stage_name}").collect()[0][0]

    performance = cross_validate(rf, X, y, cv=3)

    return f"Created model: {model_name_full}.... Performance Metrics: {performance}"

#### Registering the SPROC with the function I just created

In [None]:
# Registering my SPROC within Snowflake
print("Registering the SPROC in Snowflake as a Permanent SPROC with dedicated location and name")
train_rfc_model = session.sproc.register(citibike_ml_demo, replace=True,
                                         is_permanent=True, name="citibike_train",
                                        stage_location="@CITIBIKE_ML")

#### Running the SPROC I just created to perform model training

In [None]:
# Running the SPROC for model training
train_rfc_model(session=session)

In [None]:
# Importing my model into the session
session.add_import("@citibike_ml/predict_trips.joblib")  

# Caching my model so it is only read once
@cachetools.cached(cache={})

# Function to read the model file into another function
def read_file(filename):
       import os
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

#### Registering a UDF with a Vectorized function

In [None]:
# Registering the UDF
@udf(session=session, name="predict_trips", is_permanent=True, stage_location="@citibike_ml", replace=True)
def predict_trips(df: T.PandasDataFrame[int, float, float]) -> T.PandasSeries[float]:
    m = read_file('predict_trips.joblib')
    if isinstance(m, str):
        return m
    return m.predict(df)

In [None]:
test_df = session.sql("select rain_indicator, temp_avg_c, y_yesterday from citibike_ml_demo_features")

#### Generating prediction results

In [None]:
results = test_df.select(
    *test_df,
    F.ceil(F.call_udf("predict_trips", *test_df)).alias('PREDICTION')
    ).where(F.col("Y_YESTERDAY") != 0).select(
        *test_df,
        F.col("PREDICTION"),
        F.when(
            F.col("PREDICTION") != F.col("Y_YESTERDAY"),
            F.round(((F.col("PREDICTION")-F.col("Y_YESTERDAY"))/F.col("Y_YESTERDAY"))*100, 2)
        ).alias("ERROR_PERCENT")
            )

In [None]:
results.show()

In [None]:
date = str(datetime.datetime.today().date())
date = date.replace("-", "")
result_table = f"results_{date}"
results.write.save_as_table(table_name=result_table, mode="overwrite")
print("-----------------------------")
print(f"""Created table {result_table}""")

In [None]:
session.sql(f"select * from {result_table}").show()