# Module 1: Serving fresh online features with Feast, Kafka, Redis

## 1. Overview
In this notebook, we explore why it's valuable to build / register streaming features in Feast for both data scientists and engineers. 

We then use Spark to build streaming features from events in Kafka and registering them within Feast. We then showcase how Feast combines these streaming features with batch data sources in the online store (Redis). Users can then retrieve features at low latency from Redis through Feast.

If you haven't already, look at the [README](../README.md) for setup instructions prior to starting this notebook.

<img src="../architecture.png" width="750"/>

## 2. Setup Spark Structured Streaming to read this Kafka Topic
We first read and parse events from Kafka, run some transformations with Spark, and `forEachBatch` push to Feast

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, IntegerType, DoubleType, TimestampType

import pandas as pd
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"

In [18]:
spark = SparkSession.builder.master("local").appName("feast-spark").getOrCreate()
# Reduce partitions since default is 200 which will be slow on a local machine
spark.conf.set("spark.sql.shuffle.partitions", 5)

schema = (
    StructType()
        .add('driver_id', IntegerType(), False)
        .add('miles_driven', DoubleType(), False)
        .add('event_timestamp', TimestampType(), False)
        .add('conv_rate', DoubleType(), False)
        .add('acc_rate', DoubleType(), False)
)

# Subscribe to 1 topic, with headers
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "drivers")
    .option("startingOffsets", "earliest")
    .load()
    .selectExpr('CAST(value AS STRING)')
    .select(from_json('value', schema).alias("temp"))
    .select("temp.*")
)

# 3. Setup the feature store

### Apply feature repository
We first run `feast apply` to register the data sources + features and setup Redis.

In [3]:
!feast apply

No project found in the repository. Using project name feast_demo_local defined in feature_store.yaml
Applying changes for project feast_demo_local
Deploying infrastructure for driver_hourly_stats
Deploying infrastructure for driver_daily_features




Now, we instantiate a Feast `FeatureStore` object to push data to

In [7]:
from feast import FeatureStore
from datetime import datetime

store = FeatureStore(repo_path=".")



### Fetch training data from offline store
Just to verify the features are in the batch sources.

In [6]:
entity_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001, 1002, 1003, 1004, 1001],
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
            datetime(2021, 4, 12, 15, 1, 12),
            datetime.now()
        ]
    }
)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_daily_features:daily_miles_driven"
    ],
).to_df()
print(training_df.head())

   driver_id                  event_timestamp  conv_rate  acc_rate  \
0       1001        2021-04-12 10:59:42+00:00   0.521149  0.751659   
1       1002        2021-04-12 08:12:10+00:00   0.089014  0.212637   
2       1003        2021-04-12 16:40:26+00:00   0.188855  0.344736   
3       1004        2021-04-12 15:01:12+00:00   0.296492  0.935305   
4       1001 2024-12-08 18:53:07.087475+00:00   0.404588  0.407571   

   daily_miles_driven  
0           18.926695  
1           12.005569  
2           23.490234  
3           19.204191  
4          350.650257  


### 4. Materialize batch features & fetch online features from Redis
First we materialize features (which generate the latest values for each entity key from batch sources) into the online store (Redis)

In [10]:
!feast materialize-incremental $(date +"%Y-%m-%d")


Usage: feast materialize-incremental [OPTIONS] END_TS
Try 'feast materialize-incremental --help' for help.

Error: Got unexpected extra argument (+%Y-%m-%d))


In [7]:
!feast materialize-incremental $(date +%Y-%m-%d)

Usage: feast materialize-incremental [OPTIONS] END_TS
Try 'feast materialize-incremental --help' for help.

Error: Got unexpected extra argument (+%Y-%m-%d))


In [17]:
from datetime import datetime, timezone

# Get today's date in UTC and convert it to a string
end_ts = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S%z')

# Construct and run the command
import subprocess

command = f"feast materialize-incremental {end_ts}"
try:
    result = subprocess.run(command, shell=True, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(result.stdout)
except subprocess.CalledProcessError as e:
    print(f"Error occurred: {e}")
    print(f"Standard Output:\n{e.stdout}")
    print(f"Standard Error:\n{e.stderr}")


Error occurred: Command 'feast materialize-incremental 2024-12-08T18:01:02+0000' returned non-zero exit status 1.
Standard Output:
Materializing [1m[32m2[0m feature views to [1m[32m2024-12-08 19:01:02+01:00[0m into the [1m[32mredis[0m online store.


Standard Error:
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "C:\Users\jofwf\PycharmProjects\feast-workshop\venv\Scripts\feast.exe\__main__.py", line 7, in <module>
  File "C:\Users\jofwf\PycharmProjects\feast-workshop\venv\Lib\site-packages\click\core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\jofwf\PycharmProjects\feast-workshop\venv\Lib\site-packages\click\core.py", line 1078, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "C:\Users\jofwf\PycharmProjects\feast-workshop\venv\Lib\site-packages\click\core.py", line 1688, in invoke
   

Feast manages what time intervals have been materialized in the registry. So if you schedule regular materialization every hour, you can run `feast materialize-incremental` and Feast will know that all the previous hours were already processed.

#### SDK based online retrieval
Now we can retrieve these materialized features from Redis by directly using the SDK. This is one of the most popular ways to retrieve features with Feast since it allows you to integrate with an existing service (e.g. a Flask) that also handles model inference or pre/post-processing

In [12]:


features = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_daily_features:daily_miles_driven",
    ],
    entity_rows=[
        {
            "driver_id": 1001,
        }
    ],
).to_dict()

def print_online_features(features):
    for key, value in sorted(features.items()):
        print(key, " : ", value)

print_online_features(features)

acc_rate  :  [None]
conv_rate  :  [None]
daily_miles_driven  :  [None]
driver_id  :  [1001]


#### HTTP based online retrieval
We can also retrieve from a deployed feature server. We had previously deployed this with Docker Compose (see [docker-compose.yml](../docker-compose.yml))

This can be preferable for many reasons. If you want to build an in-memory cache, caching on a central feature server can allow more effective caching across teams. You can also more centrally manage rate-limiting / access control, upgrade Feast versions independently, etc.

In [14]:
import requests
import json

online_request = {
  "features": [
    "driver_hourly_stats:conv_rate",
    "driver_hourly_stats:acc_rate"
  ],
  "entities": {
    "driver_id": [1001]
  }
}
r = requests.post('http://localhost:6566/get-online-features', data=json.dumps(online_request))
print(json.dumps(r.json(), indent=4, sort_keys=True))

{
    "metadata": {
        "feature_names": [
            "driver_id",
            "acc_rate",
            "conv_rate"
        ]
    },
    "results": [
        {
            "event_timestamps": [
                "1970-01-01T00:00:00Z"
            ],
            "statuses": [
                "PRESENT"
            ],
            "values": [
                1001
            ]
        },
        {
            "event_timestamps": [
                "1970-01-01T00:00:00Z"
            ],
            "statuses": [
                "PRESENT"
            ],
            "values": [
                null
            ]
        },
        {
            "event_timestamps": [
                "1970-01-01T00:00:00Z"
            ],
            "statuses": [
                "PRESENT"
            ],
            "values": [
                null
            ]
        }
    ]
}


## 5. Generating fresher features via stream transformations

### 5a. Building streaming features with Kafka + Spark Structured Streaming
Now we push streaming features into Feast by ingesting events from Kafka and processing with Spark Structured Streaming.
- These features can then be further post-processed and combined with other features or request data in on demand transforms.
- An example might be to push in the last 5 transactions, and in on demand transforms generate the average of those transactions.

Feast will help manage both batch and streaming sources for you. You can run `feast materialize-incremental` as well as ingest streaming features to the same online store.

**Below, what's happening:**
- We use Spark to compute a sliding window aggregate feature that computes `daily_miles_driven` using the `miles_driven` column in the event.
- Triggers every 30 seconds
    - In this case, because we’re just reading from the `driver_stats.parquet`, we could have multiple windows of data coming in. Thus, in this code, we filter for the latest (`driver_id`, `window`) feature
    - If you have a larger watermark and can get events across multiple windows, you’ll want to have the latest window too.
- Rename `end` to `event_timestamp`, otherwise Feast will throw a validation error since it doesn’t match the schema of the `FeatureView`

In [19]:
def send_to_feast(df, epoch):
    pandas_df: pd.DataFrame = df.toPandas()
    if pandas_df.empty:
        return
    
    if "end" in pandas_df:
        print("processing window")
        # Filter out only for the latest window for the driver id
        pandas_df = pandas_df.sort_values(by=["driver_id","end"], ascending=False).groupby("driver_id").nth(-1)
        pandas_df = pandas_df.rename(columns = {"end": "event_timestamp"})
        pandas_df['created'] = pd.to_datetime('now', utc=True)
        store.push("driver_stats_push_source", pandas_df)
    pandas_df.sort_values(by="driver_id", inplace=True)
    print(pandas_df.head(20))
    print(f"Num rows: {len(pandas_df.index)}")

daily_miles_driven = (
        df.withWatermark("event_timestamp", "1 second")
        .groupBy("driver_id", window(timeColumn="event_timestamp", windowDuration="1 day", slideDuration="1 hour"))
        .agg(sum("miles_driven").alias("daily_miles_driven"))
        .select("driver_id", "window.end", "daily_miles_driven")
)

query_1 = daily_miles_driven \
    .writeStream \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/feast-workshop/q1/") \
    .trigger(processingTime="30 seconds") \
    .foreachBatch(send_to_feast) \
    .start()

query_1.awaitTermination(timeout=30)
query_1.stop()

StreamingQueryException: [STREAM_FAILED] Query [id = 333ec1ad-c0bf-4003-ab92-65d3c8efc95f, runId = 9a5aa5c3-b6ae-4469-9ecc-2c7331d9068a] terminated with exception: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'

#### 5b. Verify fresh features
Now we can verify that the `daily_miles_driven` feature has indeed changed from the original materialized features.

In [8]:
features = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_daily_features:daily_miles_driven",
    ],
    entity_rows=[
        {
            "driver_id": 1001,
        }
    ],
).to_dict()
print_online_features(features)

NameError: name 'print_online_features' is not defined

#### 5c (optional): Push features through the push server
We could also have pushed features without a `FeatureStore` object (and hence the Spark jobs don't need a `feature_store.yaml`) and instead through an HTTP request to the push server (`requests.post("http://localhost:6567/push", data=json.dumps(push_data))`) 

We already deployed a push server with Docker Compose at port 6567. Separating writes (pushes) from reads (feature serving) can be helpful for independent scaling + resource isolation.

In [14]:
import requests
import json

def send_to_feast(df, epoch):
    pandas_df: pd.DataFrame = df.toPandas()
    if pandas_df.empty:
        return
    
    if "end" in pandas_df:
        print("processing window")
        # Filter out only for the latest window for the driver id
        pandas_df = pandas_df.sort_values(by=["driver_id","end"], ascending=False).groupby("driver_id").nth(-1)
        pandas_df = pandas_df.rename(columns = {"end": "event_timestamp"})
        pandas_df['created'] = pd.to_datetime('now', utc=True)
        pandas_df.reset_index(inplace=True)

        # Push the event via HTTP (we need to map timestamps to strings since timestamps aren't serializable)
        pandas_df['created'] = pandas_df['created'].astype(str)
        pandas_df['event_timestamp'] = pandas_df['event_timestamp'].astype(str)
        pandas_dict = pandas_df.to_dict(orient="list")
        push_data = {
            "push_source_name":"driver_stats_push_source",
            "df":pandas_dict
        }
        requests.post("http://localhost:6567/push", data=json.dumps(push_data))
    pandas_df.sort_values(by="driver_id", inplace=True)
    print(pandas_df.head(20))
    print(f"Num rows: {len(pandas_df.index)}")

query_2 = daily_miles_driven \
    .writeStream \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/feast-workshop/q2/") \
    .trigger(processingTime="30 seconds") \
    .foreachBatch(send_to_feast) \
    .start()

query_2.awaitTermination(timeout=20)
query_2.stop()

22/05/18 12:23:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/05/18 12:23:06 WARN StreamingQueryManager: Stopping existing streaming query [id=c1245bc6-365d-4770-9172-1fb403ccb4f6, runId=61ed0b3f-2948-4170-8187-5ffd4e577502], as a new run is being started.
22/05/18 12:23:10 WARN HDFSBackedStateStoreProvider: The state for version 2 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
22/05/18 12:23:10 WARN HDFSBackedStateStoreProvider: The state for version 2 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
22/05/18 12:23:10 WARN HDFSBackedStateStoreProvider: The state for version 2 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query

processing window
   driver_id      event_timestamp  daily_miles_driven  \
0       1001  2023-03-08 13:00:00          596.305352   
1       1002  2023-03-08 14:00:00          644.051849   
2       1003  2023-03-08 14:00:00          547.179926   
3       1004  2023-03-08 14:00:00          695.276622   
4       1005  2023-03-08 14:00:00          640.746719   

                            created  
0  2022-05-18 16:23:10.699456+00:00  
1  2022-05-18 16:23:10.699456+00:00  
2  2022-05-18 16:23:10.699456+00:00  
3  2022-05-18 16:23:10.699456+00:00  
4  2022-05-18 16:23:10.699456+00:00  
Num rows: 5


Now we can verify yet again that features are more fresh

In [15]:
features = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_daily_features:daily_miles_driven",
    ],
    entity_rows=[
        {
            "driver_id": 1001,
        }
    ],
).to_dict()
print_online_features(features)

acc_rate  :  [0.4075707495212555]
conv_rate  :  [0.4045884609222412]
daily_miles_driven  :  [596.3053588867188]
driver_id  :  [1001]


### Cleanup
Finally, let's clean up the checkpoint directories from Spark

In [16]:
import shutil

dir_path = '/tmp/feast-workshop/'

try:
    shutil.rmtree(dir_path)
except OSError as e:
    print("Error: %s : %s" % (dir_path, e.strerror))