In [None]:


get_ipython().run_line_magic('pip', "install 'feast[ge]'")

In [1]:


import pyarrow.parquet
import pandas as pd

from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
from feast.types import Float64, Int64
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
from feast.on_demand_feature_view import on_demand_feature_view
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage
from datetime import timedelta


# ## Declare features

In [10]:


# create FileSource
batch_source = FileSource(
    timestamp_field="day",
    path="trips_stats.parquet",  # using parquet file included from repo
    file_format=ParquetFormat()
)

In [96]:
trip_stats_ds = pd.read_parquet("trips_stats.parquet")

In [98]:
trip_stats_ds.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1647134 entries, 0 to 1647133
Data columns (total 6 columns):
 #   Column                 Non-Null Count    Dtype              
---  ------                 --------------    -----              
 0   taxi_id                1647134 non-null  object             
 1   day                    1647134 non-null  datetime64[ns, UTC]
 2   total_miles_travelled  1647134 non-null  float64            
 3   total_trip_seconds     1647134 non-null  int64              
 4   total_earned           1647134 non-null  float64            
 5   trip_count             1647134 non-null  int64              
dtypes: datetime64[ns, UTC](1), float64(2), int64(2), object(1)
memory usage: 75.4+ MB


In [32]:
# head: 2019-01-01
# tail: 2020-12-31
# columns: Index(['taxi_id', 'day', 'total_miles_travelled', 'total_trip_seconds', 'total_earned', 'trip_count'], dtype='object')
sorted_trip_stats = trip_stats_ds.sort_values(by="day")

In [104]:
sorted_trip_stats["day"].head(5).array

<DatetimeArray>
['2019-01-01 00:00:00+00:00', '2019-01-01 00:00:00+00:00',
 '2019-01-01 00:00:00+00:00', '2019-01-01 00:00:00+00:00',
 '2019-01-01 00:00:00+00:00']
Length: 5, dtype: datetime64[ns, UTC]

In [40]:
len(sorted_trip_stats.taxi_id.unique())

5159

In [50]:
trip_counts = sorted_trip_stats.trip_count.unique()

In [53]:
trip_counts

array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        67,  78,  81,  87,  88,  99, 109, 114, 139])

In [56]:
sorted_trip_stats["trip_count_1_to_10"] = sorted_trip_stats.apply(lambda x : 1 if x["trip_count"] <= 10 else 0, axis=1)

In [58]:
sorted_trip_stats["trip_count_11_to_25"] = sorted_trip_stats.apply(lambda x : 1 if x["trip_count"] >= 11 and x["trip_count"] <= 25 else 0, axis=1)
sorted_trip_stats["trip_count_26_to_50"] = sorted_trip_stats.apply(lambda x : 1 if x["trip_count"] >= 26 and x["trip_count"] <= 50 else 0, axis=1)
sorted_trip_stats["trip_count_51_to_100"] = sorted_trip_stats.apply(lambda x : 1 if x["trip_count"] >= 51 and x["trip_count"] <= 100 else 0, axis=1)
sorted_trip_stats["trip_count_101_or_higher"] = sorted_trip_stats.apply(lambda x : 1 if x["trip_count"] >= 101 else 0, axis=1)

In [60]:
trip_count_featurized = sorted_trip_stats.drop(columns=["trip_count"])

In [62]:
trip_count_featurized.head()

Unnamed: 0,taxi_id,day,total_miles_travelled,total_trip_seconds,total_earned,trip_count_1_to_10,trip_count_11_to_25,trip_count_26_to_50,trip_count_51_to_100,trip_count_101_or_higher
1536869,c0a827928965347f9f399784b2a984cfb62eeca5e4ff39...,2019-01-01 00:00:00+00:00,58.6,14368,230.0,0,1,0,0,0
321620,8edce7ce70dba32066bbc7eed40b32a7602db0ceb6bb1d...,2019-01-01 00:00:00+00:00,64.6,6480,161.0,1,0,0,0,0
1470487,64375e2b26ec87f4da6e0c3b8887d949b8e91e9072a428...,2019-01-01 00:00:00+00:00,96.4,14220,297.25,0,1,0,0,0
100618,b695f0a6aeeb0c364c6ee6ca5c16f6621e00c50fd6e7d4...,2019-01-01 00:00:00+00:00,14.33,1387,36.0,1,0,0,0,0
1470486,ca2e49fc5f32316d5c59cabe4993d8f7b27ee126bc5058...,2019-01-01 00:00:00+00:00,35.3,8760,157.5,0,1,0,0,0


In [68]:
total_miles_travelled = trip_count_featurized.total_miles_travelled.unique()
total_miles_travelled.sort()
total_miles_travelled.min()
# max = 1854.6
# min = 0.01

0.01

In [71]:
trip_count_featurized["total_miles_0_to_10"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] < 10.0 else 0, axis=1)
trip_count_featurized["total_miles_10_to_25"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 10.0 and x["total_miles_travelled"] < 25.0 else 0, axis=1)
trip_count_featurized["total_miles_25_to_50"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 25.0 and x["total_miles_travelled"] < 50.0 else 0, axis=1)
trip_count_featurized["total_miles_50_to_100"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 50.0 and x["total_miles_travelled"] < 100.0 else 0, axis=1)
trip_count_featurized["total_miles_100_to_250"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 100.0 and x["total_miles_travelled"] < 250.0 else 0, axis=1)
trip_count_featurized["total_miles_250_to_500"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 250.0 and x["total_miles_travelled"] < 500.0 else 0, axis=1)
trip_count_featurized["total_miles_500_to_1000"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 500.0 and x["total_miles_travelled"] < 1000.0 else 0, axis=1)
trip_count_featurized["total_miles_1000_or_higher"] = trip_count_featurized.apply(lambda x : 1 if x["total_miles_travelled"] >= 1000.0 else 0, axis=1)
tc_tmt_featurized = trip_count_featurized.drop(columns=["total_miles_travelled"])

In [72]:
tc_tmt_featurized.head()

Unnamed: 0,taxi_id,day,total_trip_seconds,total_earned,trip_count_1_to_10,trip_count_11_to_25,trip_count_26_to_50,trip_count_51_to_100,trip_count_101_or_higher,total_miles_0_to_10,total_miles_10_to_25,total_miles_25_to_50,total_miles_50_to_100,total_miles_100_to_250,total_miles_250_to_500,total_miles_500_to_1000,total_miles_1000_or_higher
1536869,c0a827928965347f9f399784b2a984cfb62eeca5e4ff39...,2019-01-01 00:00:00+00:00,14368,230.0,0,1,0,0,0,0,0,0,1,0,0,0,0
321620,8edce7ce70dba32066bbc7eed40b32a7602db0ceb6bb1d...,2019-01-01 00:00:00+00:00,6480,161.0,1,0,0,0,0,0,0,0,1,0,0,0,0
1470487,64375e2b26ec87f4da6e0c3b8887d949b8e91e9072a428...,2019-01-01 00:00:00+00:00,14220,297.25,0,1,0,0,0,0,0,0,1,0,0,0,0
100618,b695f0a6aeeb0c364c6ee6ca5c16f6621e00c50fd6e7d4...,2019-01-01 00:00:00+00:00,1387,36.0,1,0,0,0,0,0,1,0,0,0,0,0,0
1470486,ca2e49fc5f32316d5c59cabe4993d8f7b27ee126bc5058...,2019-01-01 00:00:00+00:00,8760,157.5,0,1,0,0,0,0,0,1,0,0,0,0,0


In [79]:
trip_seconds = tc_tmt_featurized.total_trip_seconds.unique()
trip_seconds.sort()
trip_seconds.max()

527252

In [80]:
tc_tmt_featurized["seconds_0_3600"] = tc_tmt_featurized.apply(lambda x : 1 if x["total_trip_seconds"] < 3600 else 0, axis=1)
tc_tmt_featurized["seconds_3600_to_7200"] = tc_tmt_featurized.apply(lambda x : 1 if x["total_trip_seconds"] >= 3600 and x["total_trip_seconds"] < 7200 else 0, axis=1)
tc_tmt_featurized["seconds_7200_to_14400"] = tc_tmt_featurized.apply(lambda x : 1 if x["total_trip_seconds"] >= 7200 and x["total_trip_seconds"] < 14400 else 0, axis=1)
tc_tmt_featurized["seconds_14400_to_28800"] = tc_tmt_featurized.apply(lambda x : 1 if x["total_trip_seconds"] >= 14400 and x["total_trip_seconds"] < 28800 else 0, axis=1)
tc_tmt_featurized["seconds_28800_to_43200"] = tc_tmt_featurized.apply(lambda x : 1 if x["total_trip_seconds"] >= 28800 and x["total_trip_seconds"] < 43200 else 0, axis=1)
tc_tmt_featurized["seconds_43200_or_higher"] = tc_tmt_featurized.apply(lambda x : 1 if x["total_trip_seconds"] >= 43200 else 0, axis=1)
tc_tmt_tts_featurized = tc_tmt_featurized.drop(columns=["total_trip_seconds"])

In [81]:
tc_tmt_tts_featurized.head(25)

Unnamed: 0,taxi_id,day,total_earned,trip_count_1_to_10,trip_count_11_to_25,trip_count_26_to_50,trip_count_51_to_100,trip_count_101_or_higher,total_miles_0_to_10,total_miles_10_to_25,...,total_miles_100_to_250,total_miles_250_to_500,total_miles_500_to_1000,total_miles_1000_or_higher,seconds_0_3600,seconds_3600_to_7200,seconds_7200_to_14400,seconds_14400_to_28800,seconds_28800_to_43200,seconds_43200_or_higher
1536869,c0a827928965347f9f399784b2a984cfb62eeca5e4ff39...,2019-01-01 00:00:00+00:00,230.0,0,1,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
321620,8edce7ce70dba32066bbc7eed40b32a7602db0ceb6bb1d...,2019-01-01 00:00:00+00:00,161.0,1,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
1470487,64375e2b26ec87f4da6e0c3b8887d949b8e91e9072a428...,2019-01-01 00:00:00+00:00,297.25,0,1,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
100618,b695f0a6aeeb0c364c6ee6ca5c16f6621e00c50fd6e7d4...,2019-01-01 00:00:00+00:00,36.0,1,0,0,0,0,0,1,...,0,0,0,0,1,0,0,0,0,0
1470486,ca2e49fc5f32316d5c59cabe4993d8f7b27ee126bc5058...,2019-01-01 00:00:00+00:00,157.5,0,1,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
208868,78893d83a12762723e5a8ef770b3fb541e9b2cd8d4316d...,2019-01-01 00:00:00+00:00,48.5,1,0,0,0,0,0,1,...,0,0,0,0,1,0,0,0,0,0
208869,e765f58220125689ac5ca6ff4ec6d5641fb056dc419b47...,2019-01-01 00:00:00+00:00,76.5,1,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
208873,821fbe209557c784dcfc8435cd37ca5777c46f71219dac...,2019-01-01 00:00:00+00:00,52.75,1,0,0,0,0,0,1,...,0,0,0,0,1,0,0,0,0,0
158262,c38539ae509222f22b542332a6de4d8d7c9be56021f004...,2019-01-01 00:00:00+00:00,31.25,1,0,0,0,0,1,0,...,0,0,0,0,1,0,0,0,0,0
100614,4a65c34a8b9ffd4b5c1a693f6094296a97c83b39b5a38c...,2019-01-01 00:00:00+00:00,27.25,1,0,0,0,0,1,0,...,0,0,0,0,1,0,0,0,0,0


In [74]:
24 * 60 * 60

86400

In [87]:
earned = tc_tmt_tts_featurized.total_earned.unique()
earned.sort()
earned
# min: 0.00
# max: 4091.45
# mean: 225

array([0.00000e+00, 1.00000e-02, 2.00000e-02, ..., 1.64450e+03,
       3.26883e+03, 4.09145e+03])

In [91]:
tc_tmt_tts_featurized["earned_0_to_50"] = tc_tmt_tts_featurized.apply(lambda x : 1 if x["total_earned"] < 50.00 else 0, axis=1)
tc_tmt_tts_featurized["earned_50_to_100"] = tc_tmt_tts_featurized.apply(lambda x : 1 if x["total_earned"] >= 50.00 and x["total_earned"] < 100.00 else 0, axis=1)
tc_tmt_tts_featurized["earned_100_to_250"] = tc_tmt_tts_featurized.apply(lambda x : 1 if x["total_earned"] >= 100.00 and x["total_earned"] < 250.00 else 0, axis=1)
tc_tmt_tts_featurized["earned_250_to_500"] = tc_tmt_tts_featurized.apply(lambda x : 1 if x["total_earned"] >= 250.00 and x["total_earned"] < 500.00 else 0, axis=1)
tc_tmt_tts_featurized["earned_500_to_1000"] = tc_tmt_tts_featurized.apply(lambda x : 1 if x["total_earned"] >= 500.00 and x["total_earned"] < 1000.00 else 0, axis=1)
tc_tmt_tts_featurized["earned_1000_or_higher"] = tc_tmt_tts_featurized.apply(lambda x : 1 if x["total_earned"] >= 1000.00 else 0, axis=1)
featurized_data = tc_tmt_tts_featurized.drop(columns=["total_earned"])
featurized_data

Unnamed: 0,taxi_id,day,trip_count_1_to_10,trip_count_11_to_25,trip_count_26_to_50,trip_count_51_to_100,trip_count_101_or_higher,total_miles_0_to_10,total_miles_10_to_25,total_miles_25_to_50,...,seconds_7200_to_14400,seconds_14400_to_28800,seconds_28800_to_43200,seconds_43200_or_higher,earned_0_to_50,earned_50_to_100,earned_100_to_250,earned_250_to_500,earned_500_to_1000,earned_1000_or_higher
1536869,c0a827928965347f9f399784b2a984cfb62eeca5e4ff39...,2019-01-01 00:00:00+00:00,0,1,0,0,0,0,0,0,...,1,0,0,0,0,0,1,0,0,0
321620,8edce7ce70dba32066bbc7eed40b32a7602db0ceb6bb1d...,2019-01-01 00:00:00+00:00,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
1470487,64375e2b26ec87f4da6e0c3b8887d949b8e91e9072a428...,2019-01-01 00:00:00+00:00,0,1,0,0,0,0,0,0,...,1,0,0,0,0,0,0,1,0,0
100618,b695f0a6aeeb0c364c6ee6ca5c16f6621e00c50fd6e7d4...,2019-01-01 00:00:00+00:00,1,0,0,0,0,0,1,0,...,0,0,0,0,1,0,0,0,0,0
1470486,ca2e49fc5f32316d5c59cabe4993d8f7b27ee126bc5058...,2019-01-01 00:00:00+00:00,0,1,0,0,0,0,0,1,...,1,0,0,0,0,0,1,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49908,88a493149cafbde5ddde0280c77cc90c26a015c260df78...,2020-12-31 00:00:00+00:00,1,0,0,0,0,0,1,0,...,0,0,0,0,1,0,0,0,0,0
72973,87a017eedd299c9b5df098aa1d777cdefaf5d93c3c5e7c...,2020-12-31 00:00:00+00:00,1,0,0,0,0,0,1,0,...,0,0,0,0,1,0,0,0,0,0
101446,1f041942ef7c61fea34c62c89aea8231e4277ca158d54e...,2020-12-31 00:00:00+00:00,1,0,0,0,0,1,0,0,...,0,0,0,0,1,0,0,0,0,0
75603,31261f6e7fc645eff98c7964c7ea71a0ea7e387a6bd7f8...,2020-12-31 00:00:00+00:00,1,0,0,0,0,1,0,0,...,0,0,0,0,1,0,0,0,0,0


In [92]:
featurized_data.to_parquet(path="trip_stats_featurized.parquet",index=False)

In [95]:
featurized_data.columns

Index(['taxi_id', 'day', 'trip_count_1_to_10', 'trip_count_11_to_25',
       'trip_count_26_to_50', 'trip_count_51_to_100',
       'trip_count_101_or_higher', 'total_miles_0_to_10',
       'total_miles_10_to_25', 'total_miles_25_to_50', 'total_miles_50_to_100',
       'total_miles_100_to_250', 'total_miles_250_to_500',
       'total_miles_500_to_1000', 'total_miles_1000_or_higher',
       'seconds_0_3600', 'seconds_3600_to_7200', 'seconds_7200_to_14400',
       'seconds_14400_to_28800', 'seconds_28800_to_43200',
       'seconds_43200_or_higher', 'earned_0_to_50', 'earned_50_to_100',
       'earned_100_to_250', 'earned_250_to_500', 'earned_500_to_1000',
       'earned_1000_or_higher'],
      dtype='object')

In [11]:


# create taxi entity
taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])

In [12]:


# create feature view
trips_stats_fv = BatchFeatureView(
    name='trip_stats',
    entities=[taxi_entity],
    schema=[
        Field(name="total_miles_travelled", dtype=Float64),
        Field(name="total_trip_seconds", dtype=Float64),
        Field(name="total_earned", dtype=Float64),
        Field(name="trip_count", dtype=Int64),

    ],
    ttl=timedelta(seconds=86400),
    source=batch_source,
)

In [13]:


# create on-demand feature view
# these are also still in alpha iirc, and don't scale well
@on_demand_feature_view(
    sources=[
      trips_stats_fv,
    ],
    schema=[
        Field(name="avg_fare", dtype=Float64),
        Field(name="avg_speed", dtype=Float64),
        Field(name="avg_trip_seconds", dtype=Float64),
        Field(name="earned_per_hour", dtype=Float64),
    ]
)
def on_demand_stats(inp: pd.DataFrame) -> pd.DataFrame:
    out = pd.DataFrame()
    out["avg_fare"] = inp["total_earned"] / inp["trip_count"]
    out["avg_speed"] = 3600 * inp["total_miles_travelled"] / inp["total_trip_seconds"]
    out["avg_trip_seconds"] = inp["total_trip_seconds"] / inp["trip_count"]
    out["earned_per_hour"] = 3600 * inp["total_earned"] / inp["total_trip_seconds"]
    return out

In [2]:


store = FeatureStore(fs_yaml_file="feature_store.yaml")  # using feature_store.yaml that stored in the same directory
store.project

'dqm_tutorial'

In [7]:


store.apply([taxi_entity, trips_stats_fv, on_demand_stats])  # writing to the registry


# ## Generate training reference dataset

  schema = ParquetDataset(path).schema
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [14]:


taxi_ids = pyarrow.parquet.read_table("entities.parquet").to_pandas()

In [15]:


# generate range of timestamps with daily frequency
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2019-06-01", "2019-07-01", freq='D')

In [16]:


# Cross merge (aka relation multiplication) produces entity dataframe with each taxi_id repeated for each timestamp:
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-05
...,...,...
156979,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-27
156980,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-28
156981,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-29
156982,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-30


In [17]:


# Retrieving historical features for resulting entity dataframe and persisting output as a saved dataset:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",
    ]
)

In [None]:
# this is the new thing!
store.create_saved_dataset(
    from_=job,
    name='my_training_ds',
    storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
)

In [12]:


saved_dataset = pyarrow.parquet.read_table("my_training_ds.parquet").to_pandas()

In [13]:


saved_dataset


# ## Develop dataset profiler
# 
# The dataset profiler is a function that accepts a dataset and generates the set of its characteristics. These characteristics will then be used to evaluate (validate) other datasets.
# 
# Important: datasets are not compared to each other! Feast uses a reference dataset and a profiler function to generate a reference profile. This profile will then be used during validation of the tested dataset.

Unnamed: 0,taxi_id,event_timestamp,total_miles_travelled,total_trip_seconds,total_earned,trip_count,avg_fare,avg_speed,avg_trip_seconds,earned_per_hour
0,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-01 00:00:00+00:00,69.50,16080,203.50,8,25.437500,15.559701,2010.000000,45.559701
1,4f9128df57e0c64c1e98f9bfa053b2b01d5d3c21833371...,2019-06-01 00:00:00+00:00,75.20,14700,231.50,11,21.045455,18.416327,1336.363636,56.693878
2,3b8ea67bd1771560c29ac478e78e61a7cb9c3eea351b43...,2019-06-01 00:00:00+00:00,26.10,5280,72.25,2,36.125000,17.795455,2640.000000,49.261364
3,d3966c10c10e63be58eee01d3a6637d69f227f686ab885...,2019-06-01 00:00:00+00:00,124.80,23880,378.75,20,18.937500,18.814070,1194.000000,57.097990
4,d5c183f45a01c86c3cb0457bbd209c012db32a36a29c51...,2019-06-01 00:00:00+00:00,93.81,22584,315.50,20,15.775000,14.953773,1129.200000,50.292242
...,...,...,...,...,...,...,...,...,...,...
119803,ac488e1f03251055f4b4eba3ca51de23709ac4182721e0...,2019-07-01 00:00:00+00:00,21.04,5935,90.25,9,10.027778,12.762258,659.444444,54.743050
119804,5c5ebf6ea48279cddb3c608433c1e83d31bc1e699b6768...,2019-07-01 00:00:00+00:00,13.17,4408,63.50,8,7.937500,10.755898,551.000000,51.860254
119805,17e5ec0902050b4cb63ab2f7b82cd36b81c289767b4f4f...,2019-07-01 00:00:00+00:00,27.00,7650,122.25,14,8.732143,12.705882,546.428571,57.529412
119806,7dc01f4be54a4058ffb81098be25f52c9f1249afc88e3e...,2019-07-01 00:00:00+00:00,55.30,10020,178.50,11,16.227273,19.868263,910.909091,64.131737


In [3]:


import numpy as np

from feast.dqm.profilers.ge_profiler import ge_profiler

from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

In [4]:


# Load saved dataset from earlier and show contents
ds = store.get_saved_dataset('my_training_ds')
ds.to_df()


# Feast uses Great Expectations as a validation engine and ExpectationSuite as a dataset's profile. Hence, we need to develop a function that will generate an ExpectationSuite.
# 
# This function will receive an instance of PandasDataset (a wrapper around pandas.DataFrame) so we can utilize both the Pandas DataFrame API and some helper functions from PandasDataset during profiling.

Unnamed: 0,avg_trip_seconds,avg_fare,total_earned,total_miles_travelled,taxi_id,total_trip_seconds,event_timestamp,trip_count,avg_speed,earned_per_hour
0,2010.000000,25.437500,203.50,69.50,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,16080,2019-06-01 00:00:00+00:00,8,15.559701,45.559701
1,1476.000000,14.850000,74.25,15.80,33164e16dd29b1c58cd15cce31df4bfcb75d9903cb66de...,7380,2019-06-01 00:00:00+00:00,5,7.707317,36.219512
2,1270.000000,19.125000,114.75,38.50,226fe0b00be42932bdff81bc0b318b883bfbf15dd48093...,7620,2019-06-01 00:00:00+00:00,6,18.188976,54.212598
3,1415.000000,17.687500,70.75,20.22,5a5bed1b5ced617d0594007d591f10bbbca354d50b19ca...,5660,2019-06-01 00:00:00+00:00,4,12.860777,45.000000
4,1395.600000,20.850000,104.25,34.49,b7f7dbb452c0fb980a0f2050a146147c1006fe5f34e3b0...,6978,2019-06-01 00:00:00+00:00,5,17.793637,53.783319
...,...,...,...,...,...,...,...,...,...,...
119803,1205.000000,25.562500,102.25,36.98,961263722c1beadafef2355412d672acac35e4054f6aaa...,4820,2019-07-01 00:00:00+00:00,4,27.619917,76.369295
119804,692.727273,10.136364,111.50,29.00,8b07f9156e568a37d362463c84dbd1118b4eeb753bae50...,7620,2019-07-01 00:00:00+00:00,11,13.700787,52.677165
119805,588.750000,8.937500,143.00,31.00,a112879f10892d5c698ce150af17aa28615b6d005ca749...,9420,2019-07-01 00:00:00+00:00,16,11.847134,54.649682
119806,1647.000000,33.750000,101.25,37.86,68fe14b9fc2d53de5ac349d47f80f43fea895e201a31e3...,4941,2019-07-01 00:00:00+00:00,3,27.584699,73.770492


In [5]:


DELTA = 0.1  # controlling allowed window in fraction of the value on scale [0, 1]

@ge_profiler # decorator from great expectations
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
    # simple checks on data consistency
    ds.expect_column_values_to_be_between(
        "avg_speed",
        min_value=0,
        max_value=60,
        mostly=0.99  # allow some outliers
    )

    ds.expect_column_values_to_be_between(
        "total_miles_travelled",
        min_value=0,
        max_value=500,
        mostly=0.99  # allow some outliers
    )

    # expectation of means based on observed values
    observed_mean = ds.trip_count.mean()
    ds.expect_column_mean_to_be_between("trip_count",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))

    observed_mean = ds.earned_per_hour.mean()
    ds.expect_column_mean_to_be_between("earned_per_hour",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))


    # expectation of quantiles
    qs = [0.5, 0.75, 0.9, 0.95]
    observed_quantiles = ds.avg_fare.quantile(qs)

    ds.expect_column_quantile_values_to_be_between(
        "avg_fare",
        quantile_ranges={
            "quantiles": qs,
            "value_ranges": [[None, max_value] for max_value in observed_quantiles]
        })

    return ds.get_expectation_suite()

In [23]:


# check out profile of saved dataset
profile = ds.get_profile(profiler=stats_profiler)
profile

<GEProfile with expectations: [
  {
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {
      "column": "avg_speed",
      "min_value": 0,
      "max_value": 60,
      "mostly": 0.99
    },
    "meta": {}
  },
  {
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {
      "column": "total_miles_travelled",
      "min_value": 0,
      "max_value": 500,
      "mostly": 0.99
    },
    "meta": {}
  },
  {
    "expectation_type": "expect_column_mean_to_be_between",
    "kwargs": {
      "column": "trip_count",
      "min_value": 10.387244591346153,
      "max_value": 12.695521167200855
    },
    "meta": {}
  },
  {
    "expectation_type": "expect_column_mean_to_be_between",
    "kwargs": {
      "column": "earned_per_hour",
      "min_value": 52.32062497564023,
      "max_value": 63.9474305257825
    },
    "meta": {}
  },
  {
    "expectation_type": "expect_column_quantile_values_to_be_between",
    "kwargs": {
      "column": "avg_fare"

In [7]:


# Now we can create validation reference from training dataset and profiler function:
validation_reference = ds.as_reference(name="validation_reference_dataset", profiler=stats_profiler)

In [18]:


# test against existing retrieval job (which was pulling training dataset)
_ = job.to_df(validation_reference=validation_reference)
# validation passes if no exception is raised


# ## Validate New Historical Retrieval

In [19]:


from feast.dqm.errors import ValidationFailed

In [20]:


# Create some new timestamps for Dec 2020
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-05
...,...,...
35443,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-03
35444,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-04
35445,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-05
35446,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-06


In [21]:


# pull the feature for the Dec 2020 timestamps
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",
    ]
)

In [22]:


# Try to convert the pulled feature to a dataframe AND VALIDATE
try:
    df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
    print(exc.validation_report)


# The validation failed since several expectations didn't pass:
# - Trip count (mean) decreased more than 10% (which is expected when comparing Dec 2020 vs June 2019)
# - Average Fare increased - all quantiles are higher than expected
# - Earn per hour (mean) increased more than 10% (most probably due to increased fare)

# Note that an exception was raised, which is an easy way to stop a pipeline!
# 
# The exception also returns (via `.validation_report`) a json list with the errors. This json can easily be saved to a file, parsed into a slack message, or whatever.

[
  {
    "success": false,
    "expectation_config": {
      "expectation_type": "expect_column_mean_to_be_between",
      "kwargs": {
        "column": "trip_count",
        "min_value": 10.387244591346153,
        "max_value": 12.695521167200855,
        "result_format": "COMPLETE"
      },
      "meta": {}
    },
    "result": {
      "observed_value": 6.692920555429092,
      "element_count": 4393,
      "missing_count": null,
      "missing_percent": null
    },
    "meta": {},
    "exception_info": {
      "raised_exception": false,
      "exception_message": null,
      "exception_traceback": null
    }
  },
  {
    "success": false,
    "expectation_config": {
      "expectation_type": "expect_column_mean_to_be_between",
      "kwargs": {
        "column": "earned_per_hour",
        "min_value": 52.32062497564023,
        "max_value": 63.9474305257825,
        "result_format": "COMPLETE"
      },
      "meta": {}
    },
    "result": {
      "observed_value": 68.99268345164135