In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import random

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window
import snowflake.ml.modeling.preprocessing as preproc
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.registry import Registry
import snowflake.ml.modeling.metrics as metrics

In [None]:
session.get_current_database()

In [None]:
st.title(":snowflake::sparkles: Snowpark ML + Notebooks + Streamlit :sparkles::snowflake:")
st.subheader("*It just works!*")

In [None]:
select * from views_stations limit 100

In [None]:
cells.cell2

In [None]:
table_name = "views_stations"

In [None]:
select * from {{table_name}} limit 10

In [None]:
df = session.table("views_stations").drop(F.col("nhood_geo"))
df.show(3)

In [None]:
df.select(
    F.col("station_id"), F.col("station_lat"), F.col("station_lon")
).group_by(F.col("station_id")).count().select(
    F.col("station_id"), F.col("COUNT")
).show()

In [None]:
df = df.select(
    F.col("station_id"), F.col("station_lat"), F.col("station_lon")
).group_by(F.col("station_id"), F.col("station_lat"), F.col("station_lon")).count().select(
    F.col("station_id"), F.col("station_lat"), F.col("station_lon"), F.col("COUNT")
)

In [None]:
df.explain()

In [None]:
st.bar_chart(
    data = df,
    x = "STATION_ID",
    y = "COUNT"
)

In [None]:
st.map(
    data = df,
    latitude="STATION_LAT",
    longitude="STATION_LON",
    color="#29b5e8"
)

# Snowpark ML

In [None]:
def prepare_data_for_training():
    # Re-establish the base table as the df variable
    df = session.table("historical_weather")
    
    # Aggregate all of the data into a "NY Metro" classification
    weather = df.select(
        F.lit("NY Metro").alias("state"),
        F.col("postal_code"),
        F.col("date_valid_std").alias("observation_date"),
        F.col("min_temperature_air_2m_f"),
        F.col("max_temperature_air_2m_f"),
        F.col("avg_temperature_air_2m_f"),
        F.col("tot_precipitation_in"),
        F.col("tot_snowfall_in"),
        F.col("tot_snowdepth_in"),
        F.col("avg_wind_speed_100m_mph")).group_by([
            F.col("state"), F.col("observation_date")
        ]).agg(
            F.avg(F.col("min_temperature_air_2m_f")).alias("min_temp"),
            F.avg(F.col("max_temperature_air_2m_f")).alias("max_temp"),
            F.avg(F.col("avg_temperature_air_2m_f")).alias("avg_temp"),
            F.avg(F.col("tot_precipitation_in")).alias("total_precipitation"),
            F.avg(F.col("tot_snowfall_in")).alias("total_snowfall"),
            F.avg(F.col("tot_snowdepth_in")).alias("snowdepth"),
            F.avg(F.col("avg_wind_speed_100m_mph")).alias("wind_speed")
        )
    
    # Work with our internally collected ad_views and stations data
    
    ad_views = session.table("ad_views")
    stations = session.table("stations")
    
    views_stations = ad_views.join(
        stations,
        ad_views["start_station_id"]==stations["station_id"],
        "leftouter").select(
                ad_views["trip_id"].alias("ad_id"),
                ad_views["starttime"],
                ad_views["endtime"],
                ad_views["duration"],
                ad_views["start_station_id"].alias("station_id"),
                stations["station_name"],
                stations["region_name"],
                stations["borough_name"],
                stations["nhood_name"],
                stations["station_geo"],
                stations["station_lat"],
                stations["station_lon"],
                stations["nhood_geo"],
                ad_views["gender"],
                ad_views["member_type"]
            )

    # Save the data as a view
    # We can use this view to create other types of analyses
    # Example: analysis based on gender, member_type, etc...
    views_stations.dropna().createOrReplaceView(name="views_stations")

    # Perform some light feature engineering
    
    df = session.table("views_stations")
    
    df = df.select(
        *df,
        F.to_date(F.col("starttime")).alias("date"),
        # Time of Day feature
        # 3 - Late / 0 - Morning / 1 - Afternoon / 2 - Evening
        (
            F.when(
                F.to_time(F.col("starttime")).between('00:00', '04:59'),
                F.lit(3)
            ).when(
                F.to_time(F.col("starttime")).between('05:00', '11:59'),
                F.lit(0)
            ).when(
                F.to_time(F.col("starttime")).between('12:00', '16:59'),
                F.lit(1)
            ).when(
                F.to_time(F.col("starttime")).between('17:00', '23:59'),
                F.lit(2)
            )
        ).alias("time_of_day")
        ).select(
            F.col("date"),
            F.col("time_of_day"),
            F.col("station_id")
        ).groupBy(
            F.col("date"),
            F.col("time_of_day"),
            F.col("station_id")
        ).count().select(
            F.col("date"),
            F.col("time_of_day"),
            F.col("station_id"),
            F.col("count").alias("y"),
            F.log(
                base=10,
                x=F.col("count")
            ).alias("y_log"),
            F.lag(
                F.col("y_log")).over(
                    window=Window.orderBy(
                        [F.col("station_id"), F.col("time_of_day"), F.col("date")]
                    )
                ).alias("y_log_lag")
        ).orderBy(
            F.col("date").asc(),
            F.col("time_of_day").asc(),
            F.col("station_id")
        )
    # Join the views_stations and weather data
    # Select only relevant columns
    
    views_stations_weather = df.join(
        weather,
        df["date"] == weather["observation_date"],
        how="inner"
        ).select(
            F.col("date"),
            F.col("time_of_day"),
            F.col("station_id"),
            F.col("min_temp"),
            F.col("max_temp"),
            F.col("avg_temp"),
            F.col("total_precipitation"),
            F.col("total_snowfall"),
            F.col("snowdepth"),
            F.col("wind_speed"),
            F.col("y"),
            F.col("y_log"),
            F.col("y_log_lag")
        )

    # Create a wind_indicator column based on wind speed
    # 0 - None / 1 - Light / 2 - Moderate / 3 - Heavy
    views_stations_weather = views_stations_weather.withColumn(
        "wind_indicator",
        F.when(
            F.col("wind_speed") < 0.1,
            F.lit(0)
        ).when(
            F.col("wind_speed").between(0.1, 12),
            F.lit(1)
        ).when(
            F.col("wind_speed").between(12.1, 30),
            F.lit(2)
        ).when(
            F.col("wind_speed") > 30,
            F.lit(3)
        ).otherwise(F.lit(0)))
    return views_stations_weather

In [None]:
views_stations_weather = prepare_data_for_training()

In [None]:
views_stations_weather.show()

In [None]:
# Here is another, simpler way to do this using the Snowpark ML Preprocessing library
binarizer = preproc.Binarizer(
    threshold=0,
    input_cols=["total_precipitation", "total_snowfall", "snowdepth"],
    output_cols=["rain_indicator", "snow_indicator", "snow_on_ground"],
    drop_input_cols=True
)

# You'll see the new columns at the beginning of our data frame!
binarizer.transform(views_stations_weather).show()

In [None]:
# Run the binarizer, reorder columns, drop the wind_speed column -- save to the data frame definition
views_stations_weather = binarizer.transform(views_stations_weather).select(
    F.col("date"), F.col("time_of_day"), F.col("station_id"),
    F.cast(F.col("min_temp"), T.FloatType()).alias("min_temp"),
    F.cast(F.col("max_temp"), T.FloatType()).alias("max_temp"),
    F.cast(F.col("avg_temp"), T.FloatType()).alias("avg_temp"),
    F.col("wind_indicator"), F.col("rain_indicator"), F.col("snow_indicator"),
    F.col("snow_on_ground"), F.col("y"), F.col("y_log"), F.col("y_log_lag")
)

In [None]:
ohe = preproc.OneHotEncoder(
    input_cols=["STATION_ID"],
    output_cols=["STATION_ID_OHE"],
    # drop_input_cols=False
    drop_input_cols=True
)
output_cols = ohe.fit(views_stations_weather).get_output_cols()
ohe.fit(views_stations_weather).transform(views_stations_weather).show()

In [None]:
# Now apply the OHE to the data frame and reorder the columns
views_stations_weather = ohe.fit(views_stations_weather).transform(views_stations_weather)

# Save the data back to database as a table
views_stations_weather.write.save_as_table("views_stations_weather", mode="overwrite")

In [None]:
select * from views_stations_weather limit 10

In [None]:
registry = Registry(session=session, database_name="CITICORP_ADS_LAB", schema_name="DEMO_DATA")

In [None]:
session.use_warehouse("high_mem_wh")
train, test = session.table("views_stations_weather").limit(50000).random_split(weights=[0.7,0.3], seed=42)

feature_column_names = train.drop("date", "y", "y_log").columns
output_column_names = ["PREDICTION"]
label_column_name = ["Y_LOG"]

xgb = XGBRegressor(
    input_cols=feature_column_names,
    output_cols=output_column_names,
    label_cols=label_column_name
    )
xgb.fit(train)
print("Done training.")

In [None]:
test_results = xgb.predict(test).select(
    F.col("y_log"),
    F.col("prediction").alias("prediction")
)

test_results.show()

In [None]:
r2 = metrics.r2_score(
    df=test_results,
    y_true_col_name="Y_LOG",
    y_pred_col_name="PREDICTION"
)
mae = metrics.mean_absolute_error(
    df=test_results,
    y_true_col_names="Y_LOG",
    y_pred_col_names="PREDICTION"
)
mse = metrics.mean_squared_error(
    df=test_results,
    y_true_col_names="Y_LOG",
    y_pred_col_names="PREDICTION"
)

In [None]:
name = f"xgb_{random.randint(1001,1500)}"
name

In [None]:
model = registry.log_model(
    xgb,
    model_name=name,
    version_name="v1",
    comment = "My first Snowpark ML model!",
    metrics = {"r2": r2, "mae": mae, "mse": mse}
)

In [None]:
registry.show_models()

In [None]:
model = registry.get_model(name)
model = model.version("V1")
model.show_metrics()

These next two queries use **Jinja Templating** to pass in a dynamic Python value to the SQL query!

In [None]:
WITH my_model_v1 AS MODEL citicorp_ads_lab.demo_data.{{name}} VERSION "V1"
     SELECT my_model_v1!predict(*) as prediction FROM snowpark_iceberg.demo_data.XGB_DEMO_MODEL_V1_TEST_DATA limit 10

In [None]:
select
    prediction:"PREDICTION" as prediction
from (
    WITH my_model_v1 AS MODEL citicorp_ads_lab.demo_data.{{name}} VERSION "V1"
    SELECT
        my_model_v1!predict(*) as prediction
    FROM
        snowpark_iceberg.demo_data.XGB_DEMO_MODEL_V1_TEST_DATA
    limit 10
    )