# Introduction to Feature Stores with Feast
## The easy-reading booklet to master the key concepts of feature stores and learn how to use Feast

### Import libraries

In [6]:
# change this according to your local configuration
COMPUTE_LOCAL_WORKING_FOLDER = 'work/fullstackml/experiments/feast-credit-scoring'

from feast import (
    FeatureStore, 
    Entity, 
    Field, 
    FeatureService, 
    FeatureView, 
    FileSource, 
    RepoConfig,
    types,
    ValueType,
    PushSource
    )

from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
from feast.repo_config import RegistryConfig
from feast.data_source import PushMode

import pandas as pd
from datetime import datetime, timedelta
import os

### Setup registry, repository, initialize feature store

In [7]:
repo_config = RepoConfig(
    project="credit_scoring",
    registry=RegistryConfig(
        registry_type='sqlite',
        path=os.path.join(COMPUTE_LOCAL_WORKING_FOLDER,  
        'fs',
        'registry.db')),
    provider="local",
    entity_key_serialization_version=2,
    online_store=SqliteOnlineStoreConfig(
        type='sqlite', 
        path=os.path.join(COMPUTE_LOCAL_WORKING_FOLDER, 
        'fs',
        'online_store.db')),
    offline_store=FileOfflineStoreConfig(type='file')
    )
        

fs = FeatureStore(config=repo_config)

### Define entities, festure views and register them

In [8]:
zipcode = Entity(
    name="zipcode", 
    join_keys=["zipcode"]
    )
dob_ssn = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING,
    join_keys=["dob_ssn"],
    description="Date of birth and last four digits of social security number"
    )

zipcode_features = FeatureView(
    name="zipcode_features",
    entities=[zipcode],
    ttl=timedelta(days=3650),
    schema=[
        Field(name="city", dtype=types.String),
        Field(name="state", dtype=types.String),
        Field(name="location_type", dtype=types.String),
        Field(name="tax_returns_filed", dtype=types.Int64),
        Field(name="population", dtype=types.Int64),
        Field(name="total_wages", dtype=types.Int64),
    ],
    source=FileSource(
        path=os.path.join(
            COMPUTE_LOCAL_WORKING_FOLDER,
            'data',
            'zipcode_table.parquet'),
        timestamp_field="event_timestamp",
        #created_timestamp_column="created_timestamp",
    )
)

credit_history = FeatureView(
    name="credit_history",
    entities=[dob_ssn],
    ttl=timedelta(days=90),
    source=FileSource(
        path=os.path.join(
            COMPUTE_LOCAL_WORKING_FOLDER,
            'data',
            'credit_history.parquet'),
        timestamp_field="event_timestamp",
        #created_timestamp_column="created_timestamp",
    ),
)

# Register entities and feature views
fs.apply([
    credit_history, 
    dob_ssn, 
    zipcode, 
    zipcode_features]) 

  schema = ParquetDataset(path).schema.to_arrow_schema()


In [None]:
# Creating timestamps for the data  
timestamps = pd.date_range(      
    end=pd.Timestamp.now(),      
    periods=len(data_df),      
    freq='D').to_frame(name="event_timestamp", index=False) 

# Adding the timestamp column to the DataFrame
data_df = pd.concat(objs=[data_df, timestamps], axis=1)

### Generate training dataset

In [9]:
loan_data = pd.read_parquet(os.path.join(
   COMPUTE_LOCAL_WORKING_FOLDER,
   'data',
   'loan_table.parquet'))

feast_features = [
   "zipcode_features:city",
   "zipcode_features:state",
   "zipcode_features:location_type",
   "zipcode_features:tax_returns_filed",
   "zipcode_features:population",
   "zipcode_features:total_wages",
   "credit_history:credit_card_due",
   "credit_history:mortgage_due",
   "credit_history:student_loan_due",
   "credit_history:vehicle_loan_due",
   "credit_history:hard_pulls",
   "credit_history:missed_payments_2y",
   "credit_history:missed_payments_1y",
   "credit_history:missed_payments_6m",
   "credit_history:bankruptcies",
]

training_data = fs.get_historical_features(
   entity_df=loan_data, 
   features=feast_features
)

training_df = training_data.to_df()

### Save the generated dataset for use in the moderation phase.

In [11]:
# Storing the dataset as a local file
dataset = fs.create_saved_dataset(
    from_=training_data,
    name="credit_scoring_dataset",
    storage=SavedDatasetFileStorage(os.path.join(
            COMPUTE_LOCAL_WORKING_FOLDER,
            'data',
            'credit_scoring_dataset.parquet'))
            )

### How retrieve a stored dataset

In [13]:
# Retrieving the saved dataset and converting it to a DataFrame
training_df = fs.get_saved_dataset(name="credit_scoring_dataset").to_df()



### Materialization

In [14]:
fs.materialize_incremental(end_date=datetime.now())

Materializing [1m[32m2[0m feature views to [1m[32m2022-11-13 22:09:20+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mcredit_history[0m from [1m[32m2022-08-15 22:09:20+00:00[0m to [1m[32m2022-11-13 22:09:20+00:00[0m:


0it [00:00, ?it/s]


[1m[32mzipcode_features[0m from [1m[32m2012-11-15 22:09:29+00:00[0m to [1m[32m2022-11-13 22:09:20+00:00[0m:


100%|███████████████████████████████████████████████████████| 28845/28845 [00:11<00:00, 2602.80it/s]


In [15]:
dummy_loan_request = {
   "zipcode": [76104],
   "dob_ssn": ["19500806_6783"],
   "person_age": [133],
   "person_income": [59000],
   "person_home_ownership": ["RENT"],
   "person_emp_length": [123.0],
   "loan_intent": ["PERSONAL"],
   "loan_amnt": [35000],
   "loan_int_rate": [16.02],
}

# Next we fetch our online features 
customer_zipcode = dummy_loan_request['zipcode'][0]
dob_ssn = dummy_loan_request["dob_ssn"][0]

feature_vector = fs.get_online_features(
   entity_rows=[{"zipcode": customer_zipcode, "dob_ssn": dob_ssn}],
   features=feast_features,
).to_dict()

# Converting the features to a DataFrame
features = dummy_loan_request.copy()
features.update(feature_vector)
features_df = pd.DataFrame.from_dict(data=features)

### Feature Service

In [37]:
# Feature service definition (it consists of references to multiple feature views)
mixedviews_fs = FeatureService(
    name="mixed_views",
    features=[
        zipcode_features[["city","state"]],
        credit_history[["mortgage_due"]]
        ]
)

fs.apply([mixedviews_fs])

# Now a call can be made to this feature service to retrieve required data that may be coming from one or more feature views -
features_to_fetch = fs.get_feature_service("mixed_views")

entity_rows = [
        {
            "zipcode": 76104,
            "dob_ssn": '19500806_6783',
        },
    ]

#  Get features value from the online store
returned_features = fs.get_online_features(
        features=features_to_fetch,
        entity_rows=entity_rows
    ).to_dict()

# or get from offline store
returned_features_off = fs.get_historical_features(
    features=features_to_fetch, 
    entity_df=loan_data).to_df()

In [38]:
returned_features

{'zipcode': [76104],
 'dob_ssn': ['19500806_6783'],
 'state': ['TX'],
 'city': ['FORT WORTH'],
 'mortgage_due': [None]}

### Push source

In [18]:
zipcode_push_source = PushSource(
    name="zipcode_push_source",
    batch_source=FileSource(
        path=os.path.join(
            COMPUTE_LOCAL_WORKING_FOLDER,
            'data',
            'zipcode_table.parquet'),
        timestamp_field="event_timestamp",
    )
)

zipcode_features_push = FeatureView(
    name="zipcode_features_push",
    entities=[zipcode],
    ttl=timedelta(days=3650),
    source=zipcode_push_source
)

fs.apply([zipcode_features_push])

  schema = ParquetDataset(path).schema.to_arrow_schema()


In [19]:
# new dummy data to push
pushed_data = [{'zipcode': 1111,
 'city': 'NEW ROME',
 'state': 'NJ',
 'location_type': 'PRIMARY',
 'tax_returns_filed': 13245,
 'population': 24083,
 'total_wages': 1089095041,
 'event_timestamp': pd.Timestamp('2017-01-01 12:00:00+0000', tz='UTC')}]

# execute the push
fs.push("zipcode_push_source", pd.DataFrame.from_dict(data=pushed_data), to=PushMode.ONLINE_AND_OFFLINE)

  schema = ParquetDataset(path).schema.to_arrow_schema()
  schema = ParquetDataset(path).schema.to_arrow_schema()


In [20]:
# get just pushed data

fs.get_online_features(
   entity_rows=[{"zipcode": 1111}],
   features=[
   "zipcode_features_push:city",
   "zipcode_features_push:state",
   "zipcode_features_push:location_type",
   "zipcode_features_push:tax_returns_filed",
   "zipcode_features_push:population",
   "zipcode_features_push:total_wages"],
).to_dict()

{'zipcode': [1111],
 'state': ['NJ'],
 'total_wages': [1089095041],
 'population': [24083],
 'city': ['NEW ROME'],
 'tax_returns_filed': [13245],
 'location_type': ['PRIMARY']}

### Start the feature transformation server locally

Make a python file in the project folder and copy the following instructions:

In [None]:
COMPUTE_LOCAL_WORKING_FOLDER = 'work/fullstackml/experiments/feast-credit-scoring'

from feast import FeatureStore, RepoConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.repo_config import RegistryConfig

from datetime import datetime, timedelta
import os

repo_config = RepoConfig(
    project="credit_scoring",
    registry=RegistryConfig(
        registry_type='sqlite',
        path=os.path.join(COMPUTE_LOCAL_WORKING_FOLDER,
        'fs',
        'registry.db')),
    provider="local",
    entity_key_serialization_version=2,
    online_store=SqliteOnlineStoreConfig(
        type='sqlite',
        path=os.path.join(COMPUTE_LOCAL_WORKING_FOLDER,
        'fs',
        'online_store.db')),
    offline_store=FileOfflineStoreConfig(type='file')
    )

fs = FeatureStore(config=repo_config)

# if you're working from inside a container set host to 0.0.0.0 and export port 8889 on host machine
fs.serve(host='127.0.0.1',port=8889,type_='http',no_access_log=False,no_feature_log=False)

In [None]:
curl -X POST \
  "http://localhost:8889/get-online-features" \
  -d '{
    "entities": {
        "zipcode": [
            1111
        ]
    },
    "features": [
        "zipcode_features_push:city",
        "zipcode_features_push:state",
        "zipcode_features_push:location_type",
        "zipcode_features_push:tax_returns_filed",
        "zipcode_features_push:population",
        "zipcode_features_push:total_wages"
    ]
}' | jq

### Start the UI server locally

Make a python file in the project folder and copy the following instructions:

In [None]:
COMPUTE_LOCAL_WORKING_FOLDER = 'work/fullstackml/experiments/feast-credit-scoring'

from feast import FeatureStore, RepoConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.repo_config import RegistryConfig

from datetime import datetime, timedelta
import os

repo_config = RepoConfig(
    project="credit_scoring",
    registry=RegistryConfig(
        registry_type='sqlite',
        path=os.path.join(COMPUTE_LOCAL_WORKING_FOLDER,
        'fs',
        'registry.db')),
    provider="local",
    entity_key_serialization_version=2,
    online_store=SqliteOnlineStoreConfig(
        type='sqlite',
        path=os.path.join(COMPUTE_LOCAL_WORKING_FOLDER,
        'fs',
        'online_store.db')),
    offline_store=FileOfflineStoreConfig(type='file')
    )

fs = FeatureStore(config=repo_config)

# if you're working from inside a container set host to 0.0.0.0 and export port 8889 on host machine
fs.serve_ui(host='127.0.0.1',port=8889, get_registry_dump=Callable,registry_ttl_sec=5)
# registry_ttl_sec is number of seconds after which the registry is refreshed (default 5)
# open your browser on http://127.0.0.1:8889

### Erase all configurations, registry, and files generated at the time of the feature repo initialization

In [5]:
fs.teardown()