# **Scenario 1**: 
- session.table -> snowpark dataframe; applied feature engg (snowflake.ml OneHotEncoding)
- Create Feature_view (feature_df = snowpark_df; refresh_freq = None)
- Feature View : Trp_snowpark_df_1 created; BUT DATA IS NOT CAPTURED (Message: Failure during expansion of view 'TRP_SNOWPARK_DF$1': SQL compilation error: Object) 

## setup snowpark session, install dependencies

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
source_table = "NYC_YELLOW_TRIPS"

print(f"active session: {session}")

In [None]:
import warnings
import yaml
import json
import pandas as pd
from typing import List, Dict, Any
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.functions import col
from snowflake.ml.modeling.preprocessing import OneHotEncoder
from snowflake.ml.modeling.impute import SimpleImputer

from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

warnings.simplefilter(action="ignore", category=UserWarning)

## Load data, feature engineering

In [None]:
def load_data(session: snowpark.Session, database: str, schema: str, source_table: str) -> DataFrame: 
    '''loads data from a table exist in active session'''
    
    table_name = f"{database}.{schema}.{source_table}"
    # Read table to snowpark dataframe   
    sf_data = session.table(table_name)

    # preprocessing or feature engg

    # Columns with null values and their respective counts
    null_counts = [
        (col_name, sf_data.where(col(col_name).isNull()).count())
        for col_name in sf_data.columns
    ]
    print(f"Null values in the dataframe: {null_counts}") # no nulls in this dataset, not applying imputer

    # Slice required columns from snowpark dataframe
    sf_data1 = sf_data.select(col("TPEP_DROPOFF_DATETIME").alias("Time_Stamp"), col("TRIP_ID"),
                              col("PASSENGER_COUNT"), col("TRIP_DISTANCE"), col("FARE_AMOUNT"),
                              col("STORE_AND_FWD_FLAG")
                              )

    cat_cols = ["STORE_AND_FWD_FLAG"]
    
    OHE = OneHotEncoder(
        input_cols=cat_cols,
        output_cols=cat_cols,
        drop_input_cols=True,
        drop="first",
        handle_unknown="ignore",
    
    )
    sf_data2 = OHE.fit(sf_data1).transform(sf_data1)
    # rearrage columns
    sf_data3 = sf_data2[
        "TIME_STAMP", "TRIP_ID", "PASSENGER_COUNT", "TRIP_DISTANCE", "FARE_AMOUNT", "STORE_AND_FWD_FLAG_Y"]
    
    print("Data type: ", type(sf_data3))
    return sf_data3


In [None]:
sf_df = load_data(session, database, schema, source_table)
sf_df.show()

In [None]:
sf_df.queries

## create or connect to feature store

In [None]:
fs = FeatureStore(session= session, 
                             database= database, 
                             name = schema, 
                             default_warehouse= warehouse, 
                             creation_mode= CreationMode.CREATE_IF_NOT_EXIST)
print(f"feature store: {fs} created")


## create entities, register entities to feature store

In [None]:
def create_entity(feature_store: FeatureStore, name: str, join_keys: List[str], desc: str) -> Entity:
    """
    Method creates single Entity instance and register it entity to feature store
    If entity exists in feature store, script generates userwarning i.e UserWarning: Entity TRIP_NUMBER already exists. Skip registration.
    
    feature_store   : FeatureStore to use
    name            : Entity name
    join_keys       : 
    desc            :

    returns         : registered_entity
    """
    entity = Entity(name=name,
                    join_keys=join_keys,
                    desc=desc)
    registered_entity = feature_store.register_entity(entity)
    return registered_entity

In [None]:
entity_parameter_list= [
    {
        "name": "TRIP_NUM",
        "join_keys": ["TRIP_ID"],
        "desc": "Trip Unique Number"
    },
    {
        "name": "DROP_OFF_LOC",
        "join_keys": ["DOLOCATIONID"],
        "desc": "Drop off loc id."
    }
]

def create_entities(feature_store: FeatureStore, entity_parameter_list: List[Dict[str, Any]]) -> Dict[str, Entity]:
    ### if multiple entities to be created, can registe
    """ 
    Entities are the underlying objects that features and feature views are associated with. 
    They encapsulate the join keys used for feature lookups.
        
    """
    entities_mapping = {}
    for entity_parameters in entity_parameter_list:
        entity = create_entity(feature_store=feature_store,
                               name=entity_parameters["name"],
                               join_keys=entity_parameters["join_keys"],
                               desc=entity_parameters["desc"])
        entities_mapping[entity_parameters["name"]] = entity

    return entities_mapping

In [None]:
entities_mapping = create_entities(fs, entity_parameter_list)
entities_mapping

## feature view parameters

In [None]:
feature_view_parameters= [
    {
        "name": "Trp_snowpark_df",
        "entities": ["TRIP_NUM"],
        "feature_df": "sf_df",         
        "desc": "Feature view made with Pandas_df",        
        "timestamp_col" : "TIME_STAMP", 
        # "refresh_freq" : "5 minutes",       
        "feature_desc" : {
            "PASSENGER_COUNT": "The count of passenger of a trip.",
            "TRIP_DISTANCE": "The distance of a trip.",
            "FARE_AMOUNT": "The fare of a trip.",
            "STORE_AND_FWD_FLAG_Y" : "Flad id"
            
        },
        "version": "1"
    }
]


## Create Feature view (feature_df = snowpark_df; refresh_freq = None)

In [None]:
def create_feature_views(feature_store: FeatureStore, feature_view_parameters: List[Dict[str, Any]],
                         entity_mapping: Dict[str, Entity], feature_df: DataFrame) -> Dict[str, FeatureView]:
    feature_view_mapping = {}
    registered_views = feature_store.list_feature_views()

    for feature_view_param in feature_view_parameters:
        feature_view_name = feature_view_param["name"]
        feature_view_version = feature_view_param["version"]
        entities = [entity_mapping[name] for name in feature_view_param["entities"]]
        feature_df = feature_df
        timestamp_col = feature_view_param.get("timestamp_col")
        refresh_freq = feature_view_param.get("refresh_freq")
        desc = feature_view_param.get("desc")
        feature_desc = feature_view_param.get("feature_desc")

        # If FeatureView already exists in fea_store just return the reference to it
        for view in registered_views:
            if view.name == feature_view_name and view.version == feature_view_version:
                print(f"Feature View : {feature_view_name}_{feature_view_version} already exists")
                break
        else:
            # Create the FeatureView instance
            fv_instance = FeatureView(
                name=feature_view_name,
                entities=entities,
                feature_df=feature_df,
                timestamp_col=timestamp_col,
                refresh_freq=refresh_freq,
                desc=desc).attach_feature_desc(feature_desc)

            # Register the FeatureView instance.  Creates  object in Snowflake
            feature_view = feature_store.register_feature_view(
                feature_view=fv_instance,
                version=feature_view_version,
                block=True,  # whether function call blocks until initial data is available
                overwrite=False,  # whether to replace existing feature view with same name/version
            )

            print(f"Feature View : {feature_view_name}_{feature_view_version} created")
        feature_view_mapping[feature_view_name] = feature_view

    return feature_view_mapping


In [None]:
feature_view_dict= create_feature_views(feature_store=fs,
                     feature_view_parameters=feature_view_parameters,
                     entity_mapping=entities_mapping,
                     feature_df=sf_df)
print(feature_view_dict)

# Scenario 2:
- session.table -> snowpark dataframe; applied feature engg (snowflake.ml OneHotEncoding)
- Create Feature_view (feature_df = snowpark_df; refresh_freq = "5 minutes")
- **SQL compilation error: Dynamic Tables cannot depend on a temporary object 'SNOWPARK_TEMP_TABLE_IBD809K67A'**

## create feature view with refresh_freq

In [None]:
feature_view_parameters= [
    {
        "name": "Trp_snowpark_df_2",
        "entities": ["TRIP_NUM"],
        "feature_df": "sf_df",         
        "desc": "Feature view made with Pandas_df",        
        "timestamp_col" : "TIME_STAMP", 
        "refresh_freq" : "5 minutes",       
        "feature_desc" : {
            "PASSENGER_COUNT": "The count of passenger of a trip.",
            "TRIP_DISTANCE": "The distance of a trip.",
            "FARE_AMOUNT": "The fare of a trip.",
            "STORE_AND_FWD_FLAG_Y" : "Flad id"
            
        },
        "version": "1"
    }
]

def create_feature_views(feature_store: FeatureStore, feature_view_parameters: List[Dict[str, Any]],
                         entity_mapping: Dict[str, Entity], feature_df: DataFrame) -> Dict[str, FeatureView]:
    feature_view_mapping = {}
    registered_views = feature_store.list_feature_views()

    for feature_view_param in feature_view_parameters:
        feature_view_name = feature_view_param["name"]
        feature_view_version = feature_view_param["version"]
        entities = [entity_mapping[name] for name in feature_view_param["entities"]]
        feature_df = feature_df
        timestamp_col = feature_view_param.get("timestamp_col")
        refresh_freq = feature_view_param.get("refresh_freq")
        desc = feature_view_param.get("desc")
        feature_desc = feature_view_param.get("feature_desc")

        # If FeatureView already exists in fea_store just return the reference to it
        for view in registered_views:
            if view.name == feature_view_name and view.version == feature_view_version:
                print(f"Feature View : {feature_view_name}_{feature_view_version} already exists")
                break
        else:
            # Create the FeatureView instance
            fv_instance = FeatureView(
                name=feature_view_name,
                entities=entities,
                feature_df=feature_df,
                timestamp_col=timestamp_col,
                refresh_freq=refresh_freq,
                desc=desc).attach_feature_desc(feature_desc)

            # Register the FeatureView instance.  Creates  object in Snowflake
            feature_view = feature_store.register_feature_view(
                feature_view=fv_instance,
                version=feature_view_version,
                block=True,  # whether function call blocks until initial data is available
                overwrite=False,  # whether to replace existing feature view with same name/version
            )

            print(f"Feature View : {feature_view_name}_{feature_view_version} created")
        feature_view_mapping[feature_view_name] = feature_view

    return feature_view_mapping





In [None]:
feature_view_dict= create_feature_views(feature_store=fs,
                     feature_view_parameters=feature_view_parameters,
                     entity_mapping=entities_mapping,
                     feature_df=sf_df)
print(feature_view_dict)

# Scenario 3: Write fea engg'ed snowpark df to snowflake table, use it as source table to make feature store and view

In [None]:
sf_df.write.mode("overwrite").save_as_table("NYC_Trips_FeaEngg_table")
fea_engg_snowpark_df= session.table("NYC_Trips_FeaEngg_table")
fea_engg_snowpark_df.show()

In [None]:
# create feature store
fs = FeatureStore(session= session, 
                             database= database, 
                             name = schema, 
                             default_warehouse= warehouse, 
                             creation_mode= CreationMode.CREATE_IF_NOT_EXIST)
print(f"feature store: {fs} created")


In [None]:
# create and register entities

def create_entity(feature_store: FeatureStore, name: str, join_keys: List[str], desc: str) -> Entity:
    """
    Method creates single Entity instance and register it entity to feature store
    If entity exists in feature store, script generates userwarning i.e UserWarning: Entity TRIP_NUMBER already exists. Skip registration.
    
    feature_store   : FeatureStore to use
    name            : Entity name
    join_keys       : 
    desc            :

    returns         : registered_entity
    """
    entity = Entity(name=name,
                    join_keys=join_keys,
                    desc=desc)
    registered_entity = feature_store.register_entity(entity)
    return registered_entity


entity_parameter_list= [
    {
        "name": "TRIP_NUM",
        "join_keys": ["TRIP_ID"],
        "desc": "Trip Unique Number"
    },
    {
        "name": "DROP_OFF_LOC",
        "join_keys": ["DOLOCATIONID"],
        "desc": "Drop off loc id."
    }
]

def create_entities(feature_store: FeatureStore, entity_parameter_list: List[Dict[str, Any]]) -> Dict[str, Entity]:
    ### if multiple entities to be created, can registe
    """ 
    Entities are the underlying objects that features and feature views are associated with. 
    They encapsulate the join keys used for feature lookups.
        
    """
    entities_mapping = {}
    for entity_parameters in entity_parameter_list:
        entity = create_entity(feature_store=feature_store,
                               name=entity_parameters["name"],
                               join_keys=entity_parameters["join_keys"],
                               desc=entity_parameters["desc"])
        entities_mapping[entity_parameters["name"]] = entity

    return entities_mapping

In [None]:
# Create feature view using snowpark_df extracted from table (fea engg'ed df is saved as table); with refresh_freq

feature_view_parameters= [
    {
        "name": "Trp_snow_df",
        "entities": ["TRIP_NUM"],
        "feature_df": "fea_engg_snowpark_df",         
        "desc": "Feature view made with snowpark_df from fea engg'ed table",        
        "timestamp_col" : "TIME_STAMP", 
        #"refresh_freq" : "5 minutes",       
        "feature_desc" : {
            "PASSENGER_COUNT": "The count of passenger of a trip.",
            "TRIP_DISTANCE": "The distance of a trip.",
            "FARE_AMOUNT": "The fare of a trip.",
            "STORE_AND_FWD_FLAG_Y" : "Flad id"
            
        },
        "version": "1"
    }
]

def create_feature_views(feature_store: FeatureStore, feature_view_parameters: List[Dict[str, Any]],
                         entity_mapping: Dict[str, Entity], feature_df: DataFrame) -> Dict[str, FeatureView]:
    feature_view_mapping = {}
    registered_views = feature_store.list_feature_views()

    for feature_view_param in feature_view_parameters:
        feature_view_name = feature_view_param["name"]
        feature_view_version = feature_view_param["version"]
        entities = [entity_mapping[name] for name in feature_view_param["entities"]]
        feature_df = feature_df
        timestamp_col = feature_view_param.get("timestamp_col")
        refresh_freq = feature_view_param.get("refresh_freq")
        desc = feature_view_param.get("desc")
        feature_desc = feature_view_param.get("feature_desc")

        # If FeatureView already exists in fea_store just return the reference to it
        for view in registered_views:
            if view.name == feature_view_name and view.version == feature_view_version:
                print(f"Feature View : {feature_view_name}_{feature_view_version} already exists")
                break
        else:
            # Create the FeatureView instance
            fv_instance = FeatureView(
                name=feature_view_name,
                entities=entities,
                feature_df=feature_df,
                timestamp_col=timestamp_col,
                refresh_freq=refresh_freq,
                desc=desc).attach_feature_desc(feature_desc)

            # Register the FeatureView instance.  Creates  object in Snowflake
            feature_view = feature_store.register_feature_view(
                feature_view=fv_instance,
                version=feature_view_version,
                block=True,  # whether function call blocks until initial data is available
                overwrite=False,  # whether to replace existing feature view with same name/version
            )

            print(f"Feature View : {feature_view_name}_{feature_view_version} created")
        feature_view_mapping[feature_view_name] = feature_view

    return feature_view_mapping


In [None]:
feature_view_dict= create_feature_views(feature_store=fs,
                     feature_view_parameters=feature_view_parameters,
                     entity_mapping=entities_mapping,
                     feature_df=fea_engg_snowpark_df)
print(feature_view_dict)