## Set up connection and test dataset

Let's start with setting up out test environment.

In [1]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Add a query tag to the session. This helps with debugging and performance monitoring.
session.query_tag = {"origin":"sf_sit", "name":"overview_of_feature_store_api", "version":{"major":1, "minor":0}}

# Set session context 
session.use_role("FS_API_ROLE") 

# Print the current role, warehouse, and database/schema
print(f"role: {session.get_current_role()} | WH: {session.get_current_warehouse()} | DB.SCHEMA: {session.get_fully_qualified_current_schema()}")

  import pkg_resources


SnowparkSessionException: (1403): No default Session is found. Please create a session before you call function 'udf' or use decorator '@udf'.

In [2]:
# The schema where Feature Store will initialize on and test dataset stores.
FS_DEMO_SCHEMA = session.get_current_schema()
# the schema where the model lives.
MODEL_DEMO_SCHEMA = session.get_current_schema()

[Row(status='Schema SNOWFLAKE_FEATURE_STORE_NOTEBOOK_DEMO successfully created.')]

We have prepared some examples which you can find in our [open source repo](https://github.com/snowflakedb/snowflake-ml-python/tree/main/snowflake/ml/feature_store/examples). Each example contains the source dataset, feature view and entity definitions which will be used in this demo. `ExampleHelper` (included in snowflake-ml-python) will setup everything with simple APIs and you don't have to worry about the details.

In [3]:
from snowflake.ml.feature_store.examples.example_helper import ExampleHelper

example_helper = ExampleHelper(session, session.get_current_database(), FS_DEMO_SCHEMA)
example_helper.list_examples().to_pandas()

Unnamed: 0,NAME,DESC,LABEL_COLS
0,new_york_taxi_features,Features using taxi trip data trying to predic...,TOTAL_AMOUNT
1,airline_features,Features using synthetic airline data to predi...,DEPARTING_DELAY
2,wine_quality_features,Features using wine quality data trying to pre...,quality
3,citibike_trip_features,Features using citibike trip data trying to pr...,tripduration


We can quickly look at the newly generated source tables.

In [4]:
source_tables = example_helper.load_example('new_york_taxi_features')

for table in source_tables:
    print(f"{table}:")
    snowpark_df = session.table(table)
snowpark_df.show(5)

"REGTEST_DB".SNOWFLAKE_FEATURE_STORE_NOTEBOOK_DEMO.citibike_trips:


Unnamed: 0,TRIP_ID,TRIPDURATION,STARTTIME,STOPTIME,START_STATION_ID,START_STATION_NAME,START_STATION_LATITUDE,START_STATION_LONGITUDE,END_STATION_ID,END_STATION_NAME,END_STATION_LATITUDE,END_STATION_LONGITUDE,BIKEID,MEMBERSHIP_TYPE,USERTYPE,BIRTH_YEAR,GENDER
0,1,327,2013-12-05 13:09:50,2013-12-05 13:15:17,523,W 38 St & 8 Ave,40.754666,-73.991382,505,6 Ave & W 33 St,40.749013,-73.988484,15852,,Subscriber,1980,1
1,2,478,2013-12-05 13:09:52,2013-12-05 13:17:50,473,Rivington St & Chrystie St,40.721101,-73.991925,161,LaGuardia Pl & W 3 St,40.72917,-73.998102,17952,,Subscriber,1983,2
2,3,288,2013-12-05 13:09:54,2013-12-05 13:14:42,167,E 39 St & 3 Ave,40.748901,-73.976049,524,W 43 St & 6 Ave,40.755273,-73.983169,19033,,Subscriber,1988,1
3,4,1163,2013-12-05 13:10:00,2013-12-05 13:29:23,229,Great Jones St,40.727434,-73.99379,347,W Houston St & Hudson St,40.728739,-74.007488,17488,,Subscriber,1988,1
4,5,247,2013-12-05 13:10:04,2013-12-05 13:14:11,505,6 Ave & W 33 St,40.749013,-73.988484,466,W 25 St & 6 Ave,40.743954,-73.991449,15838,,Subscriber,1965,1


In [None]:
snowpark_df.describe()

# START FEATURE ENGINEERING

## CHECK FOR MISSING VALUES

In [None]:
from snowflake.snowpark.functions import col, sum as snowpark_sum, when

null_counts = snowpark_df.select(
    *[snowpark_sum(when(col(c).is_null(), 1).otherwise(0)).alias(c) for c in snowpark_df.columns]
)

null_counts.show()

In [None]:
df_fe = snowpark_df
df_fe.show()

## REMOVING CONGESTION_SURCHARGE AND AIRPORT_FEE AS THEY CONTAIN NO INFORMATION

In [None]:
df_fe = df_fe.drop(["CONGESTION_SURCHARGE", "AIRPORT_FEE"])
df_fe.show()

## CHANGING STORE_AND_FWD_FLAG TO BINARY ENCODING

In [None]:
from snowflake.snowpark.functions import col, when

df_fe = df_fe.with_column(
    "STORE_AND_FWD_FLAG",
    when(col("STORE_AND_FWD_FLAG") == "Y", 1).otherwise(0)
)

df_fe.show()

## DEALING WITH NOMINAL CATEGORICAL VARIABLES(VENDORID, RATECODEID, PAYMENT_TYPE)

In [None]:
df_fe.select("VENDORID").distinct().show()

df_fe.select("PAYMENT_TYPE").distinct().show()

df_fe.select("RATECODEID").distinct().show()

In [None]:
vendor_ids = [row["VENDORID"] for row in df_fe.select("VENDORID").distinct().collect()]
payment_types = [row["PAYMENT_TYPE"] for row in df_fe.select("PAYMENT_TYPE").distinct().collect()]
ratecode_ids = [row["RATECODEID"] for row in df_fe.select("RATECODEID").distinct().collect()]

for v_id in vendor_ids:
    new_col = f"VENDOR_{v_id}"
    df_fe = df_fe.with_column(
        new_col,
        when(col("VENDORID") == v_id, 1).otherwise(0)
    )

for p_type in payment_types:
    new_col = f"PAYMENT_{p_type}"
    df_fe = df_fe.with_column(
        new_col,
        when(col("PAYMENT_TYPE") == p_type, 1).otherwise(0)
    )

for r_id in ratecode_ids:
    new_col = f"RATECODE_{r_id}"
    df_fe = df_fe.with_column(
        new_col,
        when(col("RATECODEID") == r_id, 1).otherwise(0)
    )

df_fe = df_fe.drop(["VENDORID", "PAYMENT_TYPE", "RATECODEID"])
df_fe.show()

## EXTRACTING INFO FROM PICKUP AND DROPOFF TIME
    - TRIP DURATION IN MINUTES
    - HOUR (FOR RUSH HOUR INFO)
    - DAY (FOR WEEKDAV V/S WEEKEND)
    - MONTHS (FOR SEASONAL TRENDS)

In [None]:
from snowflake.snowpark.functions import datediff

duration_in_seconds = datediff("second", col("TPEP_PICKUP_DATETIME"), col("TPEP_DROPOFF_DATETIME"))

df_fe = df_fe.with_column("TRIP_DURATION_MINUTES", duration_in_seconds / 60)

df_fe = df_fe.with_column("TRIP_DURATION_MINUTES", col("TRIP_DURATION_MINUTES").cast("int"))

df_fe.show()

In [None]:
from snowflake.snowpark.functions import month, dayofweek, hour, sin, cos
from numpy import pi

df_fe = df_fe.with_column("PICKUP_HOUR", hour(col("TPEP_PICKUP_DATETIME")))
df_fe = df_fe.with_column("DAY_OF_WEEK", dayofweek(col("TPEP_PICKUP_DATETIME")))
df_fe = df_fe.with_column("PICKUP_MONTH", month(col("TPEP_PICKUP_DATETIME")))

df_fe = df_fe.with_column("HOUR_SIN", sin(2*pi*col("PICKUP_HOUR"))/24)
df_fe = df_fe.with_column("HOUR_COS", cos(2*pi*col("PICKUP_HOUR"))/24)

df_fe = df_fe.with_column("DAY_SIN", sin(2*pi*col("DAY_OF_WEEK"))/7)
df_fe = df_fe.with_column("DAY_COS", cos(2*pi*col("DAY_OF_WEEK"))/7)

df_fe = df_fe.with_column("MONTH_SIN", sin(2*pi*col("PICKUP_MONTH"))/12)
df_fe = df_fe.with_column("MONTH_COS", cos(2*pi*col("PICKUP_MONTH"))/12)

df_fe.show()

## SCALE NUMERICAL VALUES

In [None]:
from snowflake.snowpark.functions import col, mean, stddev

columns_to_scale = [
    "PASSENGER_COUNT",
    "TRIP_DISTANCE",
    "TRIP_DURATION_MINUTES"
]

for col_name in columns_to_scale:
    aggs = df_fe.select(mean(col(col_name)).alias("mean"), stddev(col(col_name)).alias("stddev")).collect()[0]

    col_mean = aggs["MEAN"]
    col_stddev = aggs["STDDEV"]

    df_fe = df_fe.with_column(
        col_name,
        (col(col_name) - col_mean) / col_stddev
    )

print("Numerical columns have been scaled.")
df_fe.show()

In [None]:
df_fe.columns

## REMOVE THE FEATURES AS THEY WOULD CAUSE DATA LEAKAGE
    - "FARE_AMOUNT"
    - "EXTRA"
    - "MTA_TAX"
    - "TIP_AMOUNT"
    - "TOLLS_AMOUNT"
    - "IMPROVEMENT_SURCHARGE"
    
## REMOVE UNNECESSARY FEATURES  
    - "PICKUP_HOUR"
    - "DAY_OF_WEEK"
    - "PICKUP_MONTH"

In [None]:
final_feature_columns = [
    # --- Identifiers ---
    "TRIP_ID",
    "TPEP_PICKUP_DATETIME",

    # --- Target Variable ---
    "TOTAL_AMOUNT",

    # Numerical Features
    "PASSENGER_COUNT",
    "TRIP_DISTANCE",
    "TRIP_DURATION_MINUTES",

    # Cateorical Features
    "PULOCATIONID",
    "DOLOCATIONID",
    "STORE_AND_FWD_FLAG",
    "VENDOR_1", "VENDOR_2",
    "PAYMENT_1", "PAYMENT_2", "PAYMENT_3", "PAYMENT_4", "PAYMENT_5",
    "RATECODE_1", "RATECODE_2", "RATECODE_3", "RATECODE_4", "RATECODE_5", "RATECODE_6", "RATECODE_99",

    # Time Features(cyclical)
    "HOUR_SIN", "HOUR_COS",
    "DAY_SIN", "DAY_COS",
    "MONTH_SIN", "MONTH_COS"
]

final_features_df = df_fe.select(final_feature_columns)

final_features_df.show(5)

## MAKE FEATURE STORE

In [None]:
from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode,
    FeatureViewStatus,
)

fs = FeatureStore(
    session=session, 
    database=session.get_current_database(), 
    name=FS_DEMO_SCHEMA, 
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

## MAKE ENTITY

In [None]:
from snowflake.ml.feature_store import Entity

entity = Entity(
    name="TRIP_ID_ENTITY",
    join_keys=["TRIP_ID"],
    desc="my TRIP ID Entitiy"
)
fs.register_entity(entity)

In [None]:
entity = fs.get_entity(name="TRIP_ID_ENTITY")
print(entity.join_keys)

## MAKE FEATURE VIEW

In [None]:
from snowflake.ml.feature_store import FeatureView

managed_fv = FeatureView(
    name="TAXI_FARE_FV",
    entities=[entity],
    feature_df=final_features_df, # Snowpark DataFrame containing feature transformations
    timestamp_col="TPEP_PICKUP_DATETIME",                 
    refresh_freq="30 minutes",           
    desc="my managed feature view"
)

In [None]:
fs.register_feature_view(
        feature_view=managed_fv,
        version='1.0',
)

In [None]:
retrieved_fv = fs.get_feature_view(
    name="TAXI_FARE_FV",
    version="1.0"
)

fs.list_feature_views().show()

In [None]:
from snowflake.snowpark.functions import col

spine_df = snowpark_df.select(
    col("TRIP_ID"),
    col("TPEP_PICKUP_DATETIME"),
    col("TOTAL_AMOUNT").alias("TARGET_FARE")
).sample(n=100000)

training_set = fs.generate_training_set(
    spine_df=spine_df,
    features=[retrieved_fv],
    spine_timestamp_col="TPEP_PICKUP_DATETIME",
)

print("Final training data with features joined:")
training_set.show()

In [None]:
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.metrics import mean_squared_error, r2_score

train_sdf, test_sdf = training_set.random_split([0.8, 0.2], seed=42)

feature_cols = [
    "PASSENGER_COUNT", "TRIP_DISTANCE", "TRIP_DURATION_MINUTES", "PULOCATIONID",
    "DOLOCATIONID", "STORE_AND_FWD_FLAG", "VENDOR_1", "VENDOR_2", "PAYMENT_1",
    "PAYMENT_2", "PAYMENT_3", "PAYMENT_4", "PAYMENT_5", "RATECODE_1",
    "RATECODE_2", "RATECODE_3", "RATECODE_4", "RATECODE_5", "RATECODE_6",
    "RATECODE_99", "HOUR_SIN", "HOUR_COS", "DAY_SIN", "DAY_COS",
    "MONTH_SIN", "MONTH_COS"
]
label_col = "TARGET_FARE"

print("Training XGBoost model inside Snowflake...")
regressor = XGBRegressor(
    input_cols=feature_cols,
    label_cols=label_col,
    n_estimators = 100,
    max_depth = 3,
    lr = 1e-3,
    output_cols="PREDICTED_FARE"
)

regressor.fit(train_sdf)
print("Model training complete.")


result_sdf = regressor.predict(test_sdf)


mse = mean_squared_error(df=result_sdf, y_true_col_names=label_col, y_pred_col_names="PREDICTED_FARE")
r2 = r2_score(df=result_sdf, y_true_col_name=label_col, y_pred_col_name="PREDICTED_FARE")

print(f"Model Mean Squared Error: {mse:.2f}")
print(f"Model R-squared (R²): {r2:.2f}")

result_sdf.select(label_col, "PREDICTED_FARE").show()