<center><img src=https://raw.githubusercontent.com/feast-dev/feast/master/docs/assets/feast_logo.png width=400/></center>

# Deploying the Feature Store

### Introduction

Feast enables AI/ML teams to serve (and consume) features via feature stores. In this notebook, we will configure the feature stores and feature definitions, and deploy a Feast feature store server. We will also materialize (move) data from the offline store to the online store.

In Feast, offline stores support pulling large amounts of data for model training using tools like Redshift, Snowflake, Bigquery, and Spark. In contrast, the focus of Feast online stores is feature serving in support of model inference, using tools like Redis, Snowflake, PostgreSQL, and SQLite.

In this notebook, we will setup a file-based (Dask) offline store and SQLite online store. The online store will be made available through the Feast server.

This notebook assumes that you have prepared the data by running the notebook [01_Credit_Risk_Data_Prep.ipynb](01_Credit_Risk_Data_Prep.ipynb). 

### Setup

*The following code assumes that you have read the example README.md file, and that you have setup an environment where the code can be run. Please make sure you have addressed the prerequisite needs.*

In [3]:
# Imports
import re
import sys
import time
import signal
import sqlite3
import subprocess
import datetime as dt
from feast import FeatureStore

### Feast Feature Store Configuration

For model training, we usually don't need (or want) a constantly running feature server. All we need is the ability to efficiently query and pull all of the training data at training time. In contrast, during model serving we need servers that are always ready to supply feature records in response to application requests. 

This training-serving dichotomy is reflected in Feast using "offline" and "online" stores. Offline stores are configured to work with database technologies typically used for training, while online stores are configured to use storage and streaming technologies that are popular for feature serving.

We need to create a `feature_store.yaml` config file to tell feast the structure we want in our offline and online feature stores. Below, we write the configuration for a local "Dask" offline store and local SQLite online store. We give the feature store a project name of `loan_applications`, and provider `local`. The registry is where the feature store will keep track of feature definitions and online store updates; we choose a file location in this case.

See the [feature_store.yaml](https://docs.feast.dev/reference/feature-repository/feature-store-yaml) documentation for further details. 

In [2]:
%%writefile Feature_Store/feature_store.yaml

project: loan_applications
registry: data/registry.db
provider: local
offline_store:
    type: dask
online_store:
    type: sqlite
    path: data/online_store.db
entity_key_serialization_version: 3

Writing Feature_Store/feature_store.yaml


### Feature Definitions

We also need to create feature definitions and other feature constructs in a python file, which we name `feature_definitions.py`. For our purposes, we define the following:

- Data Source: connections to data storage or data-producing endpoints
- Entity: primary key fields which can be used for joining data
- FeatureView: collections of features from a data source

For more information on these, see the [Concepts](https://docs.feast.dev/getting-started/concepts) section of the Feast documentation.

In [3]:
%%writefile Feature_Store/feature_definitions.py

# Imports
import os
from pathlib import Path
from feast import (
    FileSource,
    Entity,
    FeatureView,
    Field,
    FeatureService
)
from feast.types import Float32, String
from feast.data_format import ParquetFormat

CURRENT_DIR = os.path.abspath(os.curdir)

# Data Sources
# A data source tells Feast where the data lives
data_a = FileSource(
    file_format=ParquetFormat(),
    path=Path(CURRENT_DIR,"data/data_a.parquet").as_uri()
)
data_b = FileSource(
    file_format=ParquetFormat(),
    path=Path(CURRENT_DIR,"data/data_b.parquet").as_uri()
)

# Entity
# An entity tells Feast the column it can use to join tables
loan_id = Entity(
    name = "loan_id",
    join_keys = ["ID"]
)

# Feature views
# A feature view is how Feast groups features
features_a = FeatureView(
    name="data_a",
    entities=[loan_id],
    schema=[
        Field(name="checking_status", dtype=String),
        Field(name="duration", dtype=Float32),
        Field(name="credit_history", dtype=String),
        Field(name="purpose", dtype=String),
        Field(name="credit_amount", dtype=Float32),
        Field(name="savings_status", dtype=String),
        Field(name="employment", dtype=String),
        Field(name="installment_commitment", dtype=Float32),
        Field(name="personal_status", dtype=String),
        Field(name="other_parties", dtype=String),
    ],
    source=data_a
)
features_b = FeatureView(
    name="data_b",
    entities=[loan_id],
    schema=[
        Field(name="residence_since", dtype=Float32),
        Field(name="property_magnitude", dtype=String),
        Field(name="age", dtype=Float32),
        Field(name="other_payment_plans", dtype=String),
        Field(name="housing", dtype=String),
        Field(name="existing_credits", dtype=Float32),
        Field(name="job", dtype=String),
        Field(name="num_dependents", dtype=Float32),
        Field(name="own_telephone", dtype=String),
        Field(name="foreign_worker", dtype=String),
    ],
    source=data_b
)

# Feature Service
# a feature service in Feast represents a logical group of features
loan_fs = FeatureService(
    name="loan_fs",
    features=[features_a, features_b]
)

Writing Feature_Store/feature_definitions.py


### Applying the Configuration and Definitions

Now that we have our feature store configuration (`feature_store.yaml`) and feature definitions (`feature_definitions.py`), we are ready to "apply" them. The `feast apply` command creates a registry file (`Feature_Store/data/registry.db`) and sets up data connections; in this case, it creates a SQLite database (`Feature_Store/data/online_store.db`).

In [10]:
# Run 'feast apply' in the Feature_Store directory
!feast --chdir ./Feature_Store apply

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Users\qpdbc\.local\bin\.venv\Scripts\feast.exe\__main__.py", line 10, in <module>
  File "C:\Users\qpdbc\.local\bin\.venv\Lib\site-packages\click\core.py", line 1485, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\qpdbc\.local\bin\.venv\Lib\site-packages\click\core.py", line 1406, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "C:\Users\qpdbc\.local\bin\.venv\Lib\site-packages\click\core.py", line 1873, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\qpdbc\.local\bin\.venv\Lib\site-packages\click\core.py", line 1269, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\qpdbc\.local\bin\.ven

In [1]:
# List the Feature_Store/data/ directory to see newly created files
!ls -nlh Feature_Store/data/

'ls' ���O�����Υ~���R�O�B�i���檺�{���Χ妸�ɡC


Note that while `feast apply` set up the `sqlite` online database, `online_store.db`, no data has been added to the online database as of yet. We can verify this by connecting with the `sqlite3` library.

In [4]:
# Connect to sqlite database
conn = sqlite3.connect("Feature_Store/data/online_store.db")
cursor = conn.cursor()
# Query table data (3 tables)
print(
    "Online Store Tables:           ",
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()
)
print(
    "loan_applications_data_a data: ",
    cursor.execute("SELECT * FROM loan_applications_data_a").fetchall()
)
print(
    "loan_applications_data_b data: ",
    cursor.execute("SELECT * FROM loan_applications_data_b").fetchall()
)
conn.close()

Online Store Tables:            [('loan_applications_data_a',), ('loan_applications_data_b',)]
loan_applications_data_a data:  []
loan_applications_data_b data:  []


Since we have used `feast apply` to create the registry, we can now use the Feast Python SDK to interact with our new feature store. To see other possible commands see the [Feast Python SDK documentation](https://rtd.feast.dev/en/master/).

In [5]:
# Get feature store config
store = FeatureStore(repo_path="./Feature_Store")
store.config

RepoConfig(project='loan_applications', project_description=None, provider='local', registry_config='data/registry.db', online_config={'type': 'sqlite', 'path': 'data/online_store.db'}, auth={'type': 'no_auth'}, offline_config={'type': 'dask'}, batch_engine_config='local', feature_server=None, flags=None, repo_path=WindowsPath('Feature_Store'), entity_key_serialization_version=3, coerce_tz_aware=True, materialization_config=MaterializationConfig(pull_latest_features=False))

In [6]:
# List feature views
feature_views = store.list_batch_feature_views()
for fv in feature_views:
    print(f"Feature view: {fv.name}  |  Features: {fv.features}")

Feature view: data_a  |  Features: [Field(
    name='checking_status',
    dtype=<PrimitiveFeastType.STRING: 2>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='duration',
    dtype=<PrimitiveFeastType.FLOAT32: 6>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='credit_history',
    dtype=<PrimitiveFeastType.STRING: 2>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='purpose',
    dtype=<PrimitiveFeastType.STRING: 2>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='credit_amount',
    dtype=<PrimitiveFeastType.FLOAT32: 6>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='savings_status',
    dtype=<PrimitiveFeastType.S

### Deploying the Feature Store Servers

If you wish to share a feature store with your team, Feast provides feature servers. To spin up an offline feature server process, we can use the `feast serve_offline` command, while to spin up a Feast online feature server, we use the `feast serve` command.

Let's spin up an offline and an online server that we can use in the subsequent notebooks to get features during model training and model serving. We will run both servers as background processes, that we can communicate with in the other notebooks.

First, we write a helper function to extract the first few printed log lines (so we can print it in the notebook cell output).

In [13]:
# # TimeoutError class
# class TimeoutError(Exception):
#     pass

# # TimeoutError raise function
# def timeout():
#     raise TimeoutError("timeout")

# # Get first few log lines function
# def print_first_proc_lines(proc, wait):
#     '''Given a process, `proc`, read and print output lines until they stop 
#     comming (waiting up to `wait` seconds for new lines to appear)'''
#     lines = ""
#     while True:
#         signal.signal(signal.SIGALRM, timeout)
#         signal.alarm(wait)
#         try:
#             lines += proc.stderr.readline()
#         except:
#             break
#     if lines:
#         print(lines, file=sys.stderr)


import sys
import os
import threading
import queue
from subprocess import Popen, PIPE
import select

def _reader_thread(pipe, q):
    """Reads from a pipe and puts the read data into a queue."""
    try:
        # Read all data from the pipe until it's closed.
        with pipe:
            for byte_chunk in iter(lambda: pipe.read(4096), b''):
                q.put(byte_chunk)
    finally:
        # Signal that we are done.
        q.put(None)

def print_proc_output_with_timeout(proc: Popen, wait: int):
    """
    Given a subprocess `proc`, read and print its stderr output with a timeout.

    This function is cross-platform. It uses select() on Unix-like systems
    and a threading approach on Windows to handle I/O with a timeout.
    """
    output_parts = []

    # --- Platform-specific implementation ---
    if os.name == 'nt':  # This checks if the OS is Windows ('nt' for New Technology)
        # --- Windows implementation (using threads) ---
        q = queue.Queue()
        thread = threading.Thread(target=_reader_thread, args=[proc.stderr, q])
        thread.daemon = True  # Allows main program to exit even if thread is running
        thread.start()

        try:
            # Wait for data from the queue with a timeout
            while True:
                chunk = q.get(timeout=wait)
                if chunk is None: # End of stream signal
                    break
                output_parts.append(chunk)
        except queue.Empty:
            # This means q.get() timed out, which is our desired timeout behavior
            pass

    else:
        # --- Unix-like implementation (using select) ---
        ready_to_read, _, _ = select.select([proc.stderr], [], [], wait)
        if ready_to_read:
            # Read all available data without blocking
            data = proc.stderr.read()
            if data:
                output_parts.append(data)

    # --- Common output handling ---
    if output_parts:
        # Decode the collected bytes into a single string
        full_output = b''.join(output_parts).decode('utf-8', errors='ignore')
        print("--- Subprocess Output ---", file=sys.stderr)
        print(full_output, file=sys.stderr, end='')
        print("-------------------------", file=sys.stderr)
    else:
        print(f"No output from subprocess within the {wait}-second timeout.", file=sys.stderr)




Launch the offline server with the command `feast --chdir ./Feature_Store serve_offline`.

In [14]:
# Feast offline server process
offline_server_proc = subprocess.Popen(
    "feast --chdir ./Feature_Store serve_offline 2>&2 & echo $! > server_proc.txt",
    shell=True,
    text=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    bufsize=0
)
print_proc_output_with_timeout(offline_server_proc, wait=2)
# print_first_proc_lines(offline_server_proc, 2)

KeyboardInterrupt: 

The tail end of the command above, `2>&2 & echo $! > server_proc.txt`, captures log messages (in the offline case there are none), and writes the process PID to the file `server_proc.txt` (we will use this in the cleanup notebook, [05_Credit_Risk_Cleanup.ipynb](05_Credit_Risk_Cleanup.ipynb)).

Next, launch the online server with the command `feast --chdir ./Feature_Store serve`.

In [15]:
# Feast online server (master and worker) processes
online_server_proc = subprocess.Popen(
    "feast --chdir ./Feature_Store serve 2>&2 & echo $! >> server_proc.txt",
    shell=True,
    text=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    bufsize=0
)
print_proc_output_with_timeout(online_server_proc, 3)

KeyboardInterrupt: 

Note that the output helpfully let's us know that the online server is "Listening at: http://127.0.0.1:6566" (the default host:port).

List the running processes to verify they are up.

In [16]:
# List running Feast processes (paths redacted)
running_procs = !ps -ef | grep feast | grep serve

for line in running_procs:
    redacted = re.sub(r'/*[^\s]*(?P<cmd>(python )|(feast ))', r'**/\g<cmd>', line)
    print(redacted)

'ps' ���O�����Υ~���R�O�B�i���檺�{���Χ妸�ɡC


Note that there are two process for the online server (master and worker).

### Materialize Features to the Online Store

At this point, there is no data in the online store yet. Let's use the SDK feature store object (that we created above) to "materialize" data; this is Feast lingo for moving/updating data from the offline store to the online store.

In [17]:
# Materialize
# Recall that we mocked the outcome data to have timestamps from 
# 'Tue Sep 24 12:00:00 2023'out to "Wed Oct  9 12:00:00 2023"
# The loan outcome timestamps were then lagged by 30-90 days (which is Jan 7 12:00:00 2024)
res = store.materialize(
    start_date=dt.datetime(2023,9,24,12,0,0),
    end_date=dt.datetime(2024,1,7,12,0,0)
)

Materializing [1m[32m2[0m feature views from [1m[32m2023-09-24 12:00:00+00:00[0m to [1m[32m2024-01-07 12:00:00+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdata_a[0m:
[1m[32mdata_b[0m:


Now, we can query the SQLite database again and see data in the response!

In [18]:
# Query the online store database to verify materialized data
conn = sqlite3.connect("Feature_Store/data/online_store.db")
cursor = conn.cursor()
print(
    "loan_applications_data_a data: ",
    cursor.execute("SELECT * FROM loan_applications_data_a LIMIT 2").fetchall()
)
print(
    "loan_applications_data_b data: ",
    cursor.execute("SELECT * FROM loan_applications_data_b LIMIT 2").fetchall()
)
conn.close()

loan_applications_data_a data:  [(b'\x01\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00ID\x04\x00\x00\x00\x08\x00\x00\x00\xbb\x00\x00\x00\x00\x00\x00\x00', 'checking_status', b'\x12\x080<=X<200', None, 1696822931, None), (b'\x01\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00ID\x04\x00\x00\x00\x08\x00\x00\x00\xbb\x00\x00\x00\x00\x00\x00\x00', 'duration', b'5\x00\x00\x80A', None, 1696822931, None)]
loan_applications_data_b data:  [(b'\x01\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00ID\x04\x00\x00\x00\x08\x00\x00\x00\xbb\x00\x00\x00\x00\x00\x00\x00', 'residence_since', b'5\x00\x00@@', None, 1696822931, None), (b'\x01\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00ID\x04\x00\x00\x00\x08\x00\x00\x00\xbb\x00\x00\x00\x00\x00\x00\x00', 'property_magnitude', b'\x12\x03car', None, 1696822931, None)]


Note that the data is stored in binary strings, which is part of Feast's optimization for online queries. To get human-readable data, use the `get-online-features` REST API command, which returns a JSON response.

In [19]:
# curl command to online server to get data from the online store
cmd = """http://localhost:6566/get-online-features \
    -d '{ 
            "feature_service": "loan_fs",
            "entities": {"ID": [18, 764]}
        }'
"""

response = !curl -X POST {cmd}

In [20]:
response

['  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current',
 '                                 Dload  Upload   Total   Spent    Left  Speed',
 '',
 '  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0',
 '  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0',
 '  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0',
 '  0     0    0     0    0     0      0      0 --:--:--  0:00:02 --:--:--     0',
 'curl: (7) Failed to connect to localhost port 6566 after 2215 ms: Could not connect to server']

The `curl` command gave us a quick validation. In the [04_Credit_Risk_Model_Serving.ipynb](04_Credit_Risk_Model_Serving.ipynb) notebook, we'll use the Python `requests` library to handle the query better.

Now that the feature stores and their respective servers have been configured and deployed, we can proceed to train an AI model in [03_Credit_Risk_Model_Training.ipynb](03_Credit_Risk_Model_Training.ipynb).