In [6]:
import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

{'FEAST_CORE_URL': 'kf-feast-feast-core:6565',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION': 'file:///home/jovyan/historical_feature_output',
 'FEAST_HISTORICAL_SERVING_URL': 'kf-feast-feast-batch-serving:6566',
 'FEAST_REDIS_HOST': 'kf-feast-redis-master',
 'FEAST_SERVING_URL': 'kf-feast-feast-serving:6566',
 'FEAST_SPARK_HOME': '/usr/local/spark',
 'FEAST_SPARK_LAUNCHER': 'standalone',
 'FEAST_SPARK_STAGING_LOCATION': 'file:///home/jovyan/spark_staging_location',
 'FEAST_SPARK_STANDALONE_MASTER': 'local[*]'}


In [7]:
from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource
from feast.data_format import ParquetFormat
import pandas as pd

In [8]:
client = Client()

In [9]:
# Define Entity
driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64)

# Daily updated features 
acc_rate = Feature("acc_rate", ValueType.DOUBLE)
conv_rate = Feature("conv_rate", ValueType.DOUBLE)
avg_daily_trips = Feature("avg_daily_trips", ValueType.INT64)

# Real-time updated features
trips_today = Feature("trips_today", ValueType.INT32)


In [10]:
# This is the location we're using for the offline feature store.

driver_stats_location = os.path.join("file:///home/jovyan/", "driver_stats.parquet")

# Define FeatureTable and source
driver_statistics = FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_stats_location,
    )
)

In [11]:
# Register
client.apply(driver_id)
client.apply(driver_statistics)

In [12]:
# offlne to online ingestion
from datetime import datetime
job = client.start_offline_to_online_ingestion(
    driver_statistics,
    datetime(2020, 10, 10),
    datetime(2022, 10, 20)
)

In [13]:
job.get_status()

<SparkJobStatus.STARTING: 0>

In [14]:
# Test
driver_ids = [{'driver_id': 1001},
 {'driver_id': 1002},
 {'driver_id': 1003},
 {'driver_id': 1004}]

features = client.get_online_features(
    feature_refs=["driver_statistics:conv_rate", "driver_statistics:acc_rate", "driver_statistics:avg_daily_trips"],
    entity_rows=driver_ids).to_dict()

pd.DataFrame(features)

Unnamed: 0,driver_id,driver_statistics:acc_rate,driver_statistics:avg_daily_trips,driver_statistics:conv_rate
0,1001,0.389149,56,0.987733
1,1002,0.640081,588,0.55018
2,1003,0.75559,6,0.746147
3,1004,0.8033,719,0.269049
