# Snowflake Featue Store with dbt Feature Pipelines

This notebook will walk through an example of using Snowflake's feature store in combination with dbt for managing feature pipelines. We'll see how feature tables can be created in dbt and then used as shareable features in Snowflake's feature store. 

To follow along, you will need a dbt Cloud account.

## Project Setup

In this section we will set up the necessary schemas and we'll use the ExampleHelper in ```snowflake.ml.feature_store ``` to create a dataset to work with.

In [None]:
# Import python packages
from snowflake.ml.feature_store.examples.example_helper import ExampleHelper
from snowflake.snowpark.context import get_active_session

session = get_active_session()


In [None]:
# create some python variables that we will use in the project representing your named database, and schema
# You can create a separate schema for the feature store and its objects
fs_db = 'DBT_FS' # database for your project
fs_data_schema = 'DBT_FS_DBT_FS' # plug in where your data lands based on the dbt model outputs
fs_schema = 'FS_SCHEMA' # schema where the feature store will live

In [None]:
-- set context and create schemas. 
-- Note this already assumes you have created the database where your notebook is running currently.

use role sysadmin;
use database dbt_fs;

-- create a raw schema where we will dump the data
create schema if not exists raw;

-- create a schema specific for the feature store 
create schema if not exists fs_schema;

In [None]:
# use the ExampleHelper to get a dataset and write it to the below database and schema
helper = ExampleHelper(session, 'dbt_fs', 'raw')
source_table = helper.load_source_data('fraud_transactions')[0]
session.table(source_table).limit(5).to_pandas()

In [None]:
-- date represents April through August 31 2019, so create a version through July for initial prep
CREATE OR REPLACE TABLE dbt_fs.raw.transactions AS (
SELECT *
FROM dbt_fs.raw.fraud_transactions
WHERE TX_DATETIME < '2019-08-01'
);

## DBT Cloud

Now we are going to need to jump over to dbt Cloud, which you will need an account. You can sign up for a trial here. Once you do this, you will need to clone this [repo](https://github.com/sfc-gh-rpettus/dbt-feature-store) and run ```dbt build```. 

Once you have successfully run the models in dbt you can jump back to the notebook to begin setting up the feature store.

In [None]:
# look at the 
session.sql(f"SELECT * FROM {fs_db}.{fs_data_schema}.ft_customer_transactions").limit(10)

## Feature Store Setup

In this section we will:
1. Create a feature store in Snowflake
2. Register entities
3. Create feature views that reference our feature tables created in dbt
4. Generate a training dataset using a feature view
5. Demonstrate how the feature view can be used in an inference dataset

In [None]:
from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode
)

fs = FeatureStore(
    session=session, 
    database=fs_db, 
    name=fs_schema, 
    default_warehouse='WH_DBT',
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

In [None]:
customer = Entity(name="CUSTOMER", join_keys=["CUSTOMER_ID"])
transaction = Entity(name="TRANSACTION", join_keys=["TRANSACTION_ID"])
fs.register_entity(customer)
fs.register_entity(transaction)
fs.list_entities().show()

In [None]:
# now create a dataframe from our feature table produced in dbt
customers_transactions_df = session.sql(f"""
    SELECT 
        CUSTOMER_ID,
        TX_DATETIME,
        TX_AMOUNT_1D,
        TX_AMOUNT_7D,
        TX_AMOUNT_30D,
        TX_AMOUNT_AVG_1D,
        TX_AMOUNT_AVG_7D,
        TX_AMOUNT_AVG_30D,
        TX_CNT_1D,
        TX_CNT_7D,
        TX_CNT_30D     
    FROM {fs_db}.{fs_data_schema}.ft_customer_transactions
    """)

# now create a feature view on top of these features
customer_transactions_fv = FeatureView(
    name="customer_transactions_fv", 
    entities=[customer],
    feature_df=customers_transactions_df,
    timestamp_col="TX_DATETIME",
    refresh_freq=None,
    desc="Customer transaction features with window aggregates")

# now register the feature view for use beyond the session
customer_transactions_fv = fs.register_feature_view(
    feature_view=customer_transactions_fv,
    version="1",
    #overwrite=True,
    block=True)

In [None]:
# now create a dataframe from our feature table produced in dbt
transaction_times_df = session.sql(f"""
    SELECT 
        TRANSACTION_ID,
        TX_DATETIME,
        TX_DURING_WEEKEND,
        TX_DURING_NIGHT
    FROM {fs_db}.{fs_data_schema}.ft_transaction_times
    """)

# now create a feature view on top of these features
transaction_times_fv = FeatureView(
    name="transaction_times_fv", 
    entities=[transaction],
    feature_df=transaction_times_df,
    timestamp_col="TX_DATETIME",
    refresh_freq=None,
    desc="classification of date times for nights weekends")

# now register the feature view for use beyond the session
transaction_times_fv = fs.register_feature_view(
    feature_view=transaction_times_fv,
    version="1",
    #overwrite=True,
    block=True)



In [None]:
fs.list_feature_views().to_pandas()

In [None]:
spine_df = session.create_dataframe(
    [
        ('1', '3937', "2019-05-01 00:00"), 
        ('2', '2', "2019-05-01 00:00"),
        ('3', '927', "2019-05-01 00:00"),
    ], 
    schema=["INSTANCE_ID", "CUSTOMER_ID", "EVENT_TIMESTAMP"])

train_dataset = fs.generate_dataset(
    name= "customers_fv",
    version= "1_0",
    spine_df=spine_df,
    features=[customer_transactions_fv],
    spine_timestamp_col= "EVENT_TIMESTAMP",
    spine_label_cols = []
)

In [None]:
training_data_df = train_dataset.read.to_snowpark_dataframe()
training_data_df.limit(10)

In [None]:
# now let's see how this can be used in an inference pipeline. 
# The modeling process is out of scope for this exercise.
# But you can plug this into a model that you have deployed for your predictions.

infernce_spine = session.create_dataframe(
    [
        ('1', '3937', "2019-07-01 00:00"), 
        ('2', '2', "2019-07-01 00:00"),
        ('3', '927', "2019-07-01 00:00"),
    ], 
    schema=["INSTANCE_ID", "CUSTOMER_ID", "EVENT_TIMESTAMP"])

inference_dataset = fs.retrieve_feature_values(
    spine_df=infernce_spine,
    features=[customer_transactions_fv],
    spine_timestamp_col="EVENT_TIMESTAMP",
)

inference_dataset.to_pandas()

In [None]:
-- clean up 
-- drop schema fs_schema;