# Часть 2: Реализация Feature Store

In [1]:
import pandas as pd
from pandas.testing import assert_frame_equal
from feast import FeatureStore
import numpy as np

## 1. Определение фичей для Offline и Online

В нашей задаче прогнозирования спроса, "сущностью" является не пользователь, а регион (PULocationID). Мы хотим знать характеристики спроса для каждого региона в определенный момент времени.

    Фичи для обучения (Offline):
    Это те признаки, которые мы уже создали для обучения нашей модели. Они рассчитываются на основе исторических данных.
        hour: Час.
        dayofweek: День недели.
        lag_1h: Спрос час назад.
        lag_24h: Спрос 24 часа назад (суточная сезонность).
        lag_168h: Спрос неделю назад (недельная сезонность).
        rolling_mean_24h: Средний спрос за последние 24 часа.

    Фичи для инференса (Online):
    Когда мы хотим сделать прогноз на следующий час в реальном времени (например, в 15:00), нам нужны точно такие же фичи, но рассчитанные на основе самых свежих данных:
        hour: Час.
        dayofweek: День недели.
        lag_1h: Реальный спрос в 14:00.
        lag_24h: Реальный спрос вчера в 15:00.
        lag_168h: Реальный спрос неделю назад в 15:00.
        rolling_mean_24h: Средний спрос с 14:00 сегодня до 15:00 вчера.

    Фичи, критичные для согласованности:
    Все перечисленные фичи являются критичными. Логика их расчета должна быть абсолютно идентичной при обучении (offline) и при реальном использовании (online). Если в offline-расчете lag_24h вы учитывали данные до 14:59:59, а в online — до 15:00:00, это может привести к расхождению (train-serve skew) и ухудшению качества модели в продакшене. Именно эту проблему и решает Feature Store.


## 2. Реализация Feature Store с помощью Feast

Создание файла с фичами в файле `create_features.py`

In [2]:
df_features = pd.read_parquet('nyc_taxi_demand/feature_repo/data/demand_agg_with_ts.parquet')
df_features.head()

Unnamed: 0,PULocationID,pickup_hour,trip_count,hour,dayofweek,lag_1h,lag_24h,lag_168h,rolling_mean_24h
0,1,2019-02-01 01:00:00,1,1,4,0.0,0.0,0.0,0.0
1,1,2019-02-01 06:00:00,2,6,4,1.0,0.0,0.0,0.0
2,1,2019-02-01 07:00:00,1,7,4,2.0,0.0,0.0,0.0
3,1,2019-02-01 08:00:00,3,8,4,1.0,0.0,0.0,0.0
4,1,2019-02-01 09:00:00,1,9,4,3.0,0.0,0.0,0.0


In [4]:
min_date = df_features['pickup_hour'].min()
max_date = df_features['pickup_hour'].max()
date_definition = min_date + (max_date - min_date) * (1 - 0.9)
print(f"Дата разделения: {date_definition}")
print(f"Начало данных: {min_date}, конец данных: {max_date}")

# feast materialize 2019-01-01T00:00:00 2019-12-31T23:00:00
print(f"Команда для материализации feast: feast materialize {min_date} {date_definition}")

Дата разделения: 2019-02-03 19:05:59.999999
Начало данных: 2019-02-01 00:00:00, конец данных: 2019-02-28 23:00:00
Команда для материализации feast: feast materialize 2019-02-01 00:00:00 2019-02-03 19:05:59.999999


In [5]:
date_for_train = min_date + (date_definition - min_date) * (1 - 0.5) * 0.7
print(f"Дата для обучения: {date_for_train}")

Дата для обучения: 2019-02-01 23:29:05.999999


In [10]:
date_middle = min_date + (date_definition - min_date) * (1 - 0.5)
print(f"Дата середины: {date_middle}")

Дата середины: 2019-02-02 09:32:59.999999


## 3. Определение features в features.py

In [17]:
cat ./nyc_taxi_demand/feature_repo/features.py

# This is an example feature definition file

from datetime import timedelta

import pandas as pd

from feast import (
    Entity,
    FeatureService,
    FeatureView,
    Field,
    FileSource,
    Project,
    PushSource,
    RequestSource,
    ValueType
)
from feast.feature_logging import LoggingConfig
from feast.infra.offline_stores.file_source import FileLoggingDestination
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64


project = Project(name="nyc_taxi_demand", description="A project for driver statistics")

location = Entity(name="PULocationID", value_type=ValueType.INT64, description="Pickup Location ID")

taxi_demand_source = FileSource(
    name="taxi_demand_source",
    path="data/demand_agg_with_ts.parquet",
    timestamp_field="pickup_hour"
)

demand_features_fv = FeatureView(
    name="taxi_stats",
    entities=[location],
    schema=[
        Field(name="trip_count", dtype=Int64),
        Field(name="lag_1h

  pid, fd = os.forkpty()


## 4-5. Согласованность и проверка

In [6]:
store = FeatureStore(repo_path="./nyc_taxi_demand/feature_repo")

In [7]:
train_entity_df = df_features[df_features['pickup_hour'] <= date_for_train].copy()
train_entity_df = train_entity_df[['pickup_hour', 'PULocationID']]
train_entity_df['PULocationID'] = train_entity_df['PULocationID'].astype('int64')

print(f"\nПолучаем исторические фичи для {len(train_entity_df)} событий...")

features_to_get = [
    "taxi_stats:trip_count",
    "taxi_stats:lag_1h",
    "taxi_stats:lag_24h",
    "taxi_stats:lag_168h",
    "taxi_stats:rolling_mean_24h",
    "taxi_stats:hour",
    "taxi_stats:dayofweek",
]

train_df = store.get_historical_features(
    entity_df=train_entity_df,
    features=features_to_get,
).to_df()

print("Обучающий датасет из Feast:")
print(train_df.head())


Получаем исторические фичи для 6045 событий...
Using pickup_hour as the event timestamp. To specify a column explicitly, please name it event_timestamp.
Обучающий датасет из Feast:
                pickup_hour  PULocationID  trip_count  lag_1h  lag_24h  \
0 2019-02-01 00:00:00+00:00           124          21     0.0      0.0   
1 2019-02-01 00:00:00+00:00            22          39     0.0      0.0   
2 2019-02-01 00:00:00+00:00           227          31     0.0      0.0   
3 2019-02-01 00:00:00+00:00           180          18     0.0      0.0   
4 2019-02-01 00:00:00+00:00            51          52     0.0      0.0   

   lag_168h  rolling_mean_24h  hour  dayofweek  
0       0.0               0.0     0          4  
1       0.0               0.0     0          4  
2       0.0               0.0     0          4  
3       0.0               0.0     0          4  
4       0.0               0.0     0          4  


In [14]:
test_entity_df = df_features[(df_features['pickup_hour'] > date_for_train) & (df_features['pickup_hour'] <= date_middle)].copy()
test_entity_df = test_entity_df[['pickup_hour', 'PULocationID']]
test_entity_df['PULocationID'] = test_entity_df['PULocationID'].astype('int64')
print(f"\nПолучаем исторические фичи для {len(test_entity_df)} событий...")
test_df = store.get_historical_features(
    entity_df=test_entity_df,
    features=features_to_get,
).to_df()
print("Тестовый датасет из Feast:")
print(test_df.head())


Получаем исторические фичи для 2507 событий...
Using pickup_hour as the event timestamp. To specify a column explicitly, please name it event_timestamp.
Тестовый датасет из Feast:
                pickup_hour  PULocationID  trip_count  lag_1h  lag_24h  \
0 2019-02-02 00:00:00+00:00           115          16    18.0     14.0   
1 2019-02-02 00:00:00+00:00           130         191   224.0    127.0   
2 2019-02-02 00:00:00+00:00           220         135   148.0     73.0   
3 2019-02-02 00:00:00+00:00            29          45    49.0     20.0   
4 2019-02-02 00:00:00+00:00           125         145   190.0    115.0   

   lag_168h  rolling_mean_24h  hour  dayofweek  
0       0.0         18.291667     0          5  
1       0.0        171.250000     0          5  
2       0.0        111.166667     0          5  
3       0.0         56.833333     0          5  
4       0.0        158.750000     0          5  


In [15]:
print("\nПроверяем согласованность фичей...")

# Подготавливаем эталонный DataFrame
direct_features = df_features[df_features['pickup_hour'] <= date_for_train].copy()
direct_features['PULocationID'] = direct_features['PULocationID'].astype('int64')

# Подготавливаем DataFrame из Feast
fs_features = train_df.copy()

# Приводим к единому формату
# Убираем таймзону из данных Feast для корректного сравнения
fs_features['pickup_hour'] = fs_features['pickup_hour'].dt.tz_localize(None)

# Приводим ОБЕ колонки к единой точности (nanoseconds), чтобы типы совпадали
direct_features['pickup_hour'] = direct_features['pickup_hour'].astype('datetime64[ns]')
fs_features['pickup_hour'] = fs_features['pickup_hour'].astype('datetime64[ns]')

# Выравниваем колонки и сортируем
common_columns = [col for col in direct_features.columns if col in fs_features.columns]
direct_features_aligned = direct_features[common_columns].sort_values(by=['pickup_hour', 'PULocationID']).reset_index(drop=True)
fs_features_aligned = fs_features[common_columns].sort_values(by=['pickup_hour', 'PULocationID']).reset_index(drop=True)

# Сравниваем
try:
    assert_frame_equal(direct_features_aligned, fs_features_aligned, atol=1e-6)
    print("✅ Проверка согласованности пройдена! Фичи идентичны.")
except AssertionError as e:
    print("❌ Проверка согласованности провалена!")
    print(e)


Проверяем согласованность фичей...
✅ Проверка согласованности пройдена! Фичи идентичны.


In [16]:
# Симуляция запроса в реальном времени
# Допустим, мы хотим сделать прогноз для нескольких регионов прямо сейчас

online_entities = [
    {"PULocationID": 130},
    {"PULocationID": 220},
    {"PULocationID": 125},
]

print("\nПолучаем online-фичи...")

online_features = store.get_online_features(
    features=features_to_get,  # тот же список фичей
    entity_rows=online_entities
).to_dict()

# Преобразуем в удобный для просмотра DataFrame
online_df = pd.DataFrame.from_dict(online_features)
print("Online-фичи, полученные из Feast:")
print(online_df)


Получаем online-фичи...
Online-фичи, полученные из Feast:
   PULocationID  rolling_mean_24h  trip_count  lag_24h  hour  lag_1h  \
0           130        177.583328         176    163.0    23   240.0   
1           220         96.916664         113     90.0    23    93.0   
2           125        135.166672         220    116.0    23   238.0   

   lag_168h  dayofweek  
0     155.0          3  
1      92.0          3  
2     193.0          3  
