# Validating Historical Features with Great Expectations

![Validating Historical Features with Great Expectations](./dqm-diagram-v2.jpg)

In this tutorial, we will use the public dataset of Chicago taxi trips to present data validation capabilities of Feast. The original dataset is stored in BigQuery and consists of raw data for each taxi trip (one row per trip) since 2013. We will generate several training datasets (aka historical features in Feast) for different periods and evaluate expectations made on one dataset against another. Our features will represent aggregations of raw data with daily intervals (eg, trips per day, average fare or speed for a specific day, etc.). We will craft some features using SQL while pulling data from BigQuery (like total trips time or total miles travelled). Another chunk of features will be implemented using Feast's on-demand transformations - features calculated on the fly when requested.

Our plan:

1. Declare & apply features and feature views in Feast
2. Generate reference dataset
3. Develop & test profiler function
4. Run validation on different dataset using reference dataset & profiler

In [1]:
import pyarrow.parquet
import pandas as pd
from datetime import timedelta
from feast import Feature, FeatureView, Entity, FeatureStore, Field
from feast.value_type import ValueType
from feast.types import  Float64, Int32
from feast.data_format import ParquetFormat
from feast.on_demand_feature_view import on_demand_feature_view
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage
from google.protobuf.duration_pb2 import Duration

## 1. Declaring features

In [2]:
df = pd.read_parquet("trips_stats.parquet")
display(df)

Unnamed: 0,taxi_id,day,total_miles_travelled,total_trip_seconds,total_earned,trip_count
0,9b2a23587b25fe43998076b3761b979ee91f9f5ac8bdac...,2019-04-30 00:00:00+00:00,31.04,15523,207.00,31
1,26211b46f6f6c6987bee5e2062934ba8692bced4811d1a...,2019-04-05 00:00:00+00:00,88.21,16757,331.50,33
2,018adeb3e0e0f1b8c78f37c4b99735005ea7acfe800ab8...,2020-02-05 00:00:00+00:00,39.69,14331,233.00,36
3,c8549ac4a2842a038f2a5b91ecf894ca7cb04d60558be3...,2020-02-26 00:00:00+00:00,46.74,15535,235.25,30
4,2780ead18beaa862cc67315ddabd9d1acaadcd6da82eba...,2020-02-20 00:00:00+00:00,86.40,21749,353.25,38
...,...,...,...,...,...,...
1647129,381b18e55254dd5635fefcf1b2a7956671bd47be71a38e...,2020-03-12 00:00:00+00:00,60.21,15396,254.00,28
1647130,848a1ee7cd6c9d5c24658a35523ec72ae374eef3f562bc...,2020-03-04 00:00:00+00:00,52.85,17958,251.00,28
1647131,8da9e1d18757022c6a6a614fc2d38483e38aae441feff5...,2020-03-02 00:00:00+00:00,75.59,14840,283.50,28
1647132,c797f1560410b9db343567ea7c8e4095f66ceb65800fa4...,2020-09-25 00:00:00+00:00,73.73,17125,284.25,28


In [3]:
id = 'd13c5aaa066f94b4927779ed24cd313b0c686f03407095cf5daa4c227d411eb5049a5b1ce8ff07157f9641e0e863be971c9d24bc3ef3f89857d7788fef397242'
df[df.taxi_id == id].sort_values(by="day", ascending=False)

Unnamed: 0,taxi_id,day,total_miles_travelled,total_trip_seconds,total_earned,trip_count
1149410,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2020-03-13 00:00:00+00:00,12.9,4320,75.50,12
1068987,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2020-03-12 00:00:00+00:00,23.2,6420,99.50,11
1008498,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2020-03-11 00:00:00+00:00,14.9,4800,76.50,10
1118291,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2020-03-10 00:00:00+00:00,21.8,6360,101.00,12
922475,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2020-03-09 00:00:00+00:00,21.8,6300,89.75,9
...,...,...,...,...,...,...
984363,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-01-07 00:00:00+00:00,11.4,3780,66.75,10
109519,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-01-05 00:00:00+00:00,1.2,540,10.25,2
877317,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-01-04 00:00:00+00:00,32.4,6540,116.00,9
1484460,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-01-03 00:00:00+00:00,25.3,7860,132.50,19


In [4]:
batch_source = FileSource(
    timestamp_field="day",
    path="trips_stats.parquet",  # using parquet file that we created on previous step
    file_format=ParquetFormat()
)

taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])

trips_stats_fv = FeatureView(
    name='trip_stats',
    entities=['taxi'],
    schema=[
        Field(name="total_miles_travelled", dtype=Float64),
        Field(name="total_trip_seconds", dtype=Float64),
        Field(name="total_earned", dtype=Float64),
        Field(name="trip_count", dtype=Float64),
    ],
    ttl=timedelta(seconds=86400), # 86400 sec = 1day
    source=batch_source,
)

*Read more about feature views in [Feast docs](https://docs.feast.dev/getting-started/concepts/feature-view)*

In [5]:
@on_demand_feature_view(
    schema=[
        Field(name="avg_fare", dtype=Float64),
        Field(name="avg_speed", dtype=Float64),
        Field(name="avg_trip_seconds", dtype=Float64),
        Field(name="earned_per_hour", dtype=Float64),
    ],
    sources=[trips_stats_fv]
)

def on_demand_stats(inp):
    out = pd.DataFrame()
    out["avg_fare"] = inp["total_earned"] / inp["trip_count"]
    out["avg_speed"] = 3600 * inp["total_miles_travelled"] / inp["total_trip_seconds"]
    out["avg_trip_seconds"] = inp["total_trip_seconds"] / inp["trip_count"]
    out["earned_per_hour"] = 3600 * inp["total_earned"] / inp["total_trip_seconds"]
    return out

*Read more about on demand feature views [here](https://docs.feast.dev/reference/alpha-on-demand-feature-view)*

In [6]:
store = FeatureStore(".")  # using feature_store.yaml that stored in the same directory
store.apply([taxi_entity, trips_stats_fv, on_demand_stats])  # writing to the registry

## 3. Generating training (reference) dataset

In [7]:
# Read a table with all taxi IDs
taxi_ids = pyarrow.parquet.read_table("entities.parquet").to_pandas()
display(taxi_ids)

Unnamed: 0,taxi_id
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
1,9d5308a039a77963ed44f4b4435abd45f28eb7c4eb7669...
2,2d411799ad37a21a76e73cab9a4ec254f551ba45761f4e...
3,a36483657911ff455df8757d429c4c7cc1d720b6ef8f4a...
4,e97ff21e0c8f655aa03ec62fbccdf33578a21516535ce7...
...,...
5059,91d35a6982a6a0287931022e745957a1d3284b0ed36b91...
5060,b0372cb54559e5569b3fe82dec65d7ac25d8276d59e9b6...
5061,d63ae52c1bdf4ff2edf7fd88db73bfa5b5ea1c045ea2bc...
5062,468715c4e6b0967d57ec45e4ee960045525de2b01a3229...


In [8]:
# Generate a time stamp table with 1 day date per row for 1 month
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2019-06-01", "2019-07-01", freq='D')
display(timestamps)

Unnamed: 0,event_timestamp
0,2019-06-01
1,2019-06-02
2,2019-06-03
3,2019-06-04
4,2019-06-05
5,2019-06-06
6,2019-06-07
7,2019-06-08
8,2019-06-09
9,2019-06-10


Cross merge (aka Cartesian product) produces entity dataframe with each taxi_id repeated for each timestamp:

In [9]:
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
display(entity_df)

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-05
...,...,...
156979,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-27
156980,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-28
156981,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-29
156982,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-06-30


Retrieving historical features for resulting entity dataframe and persisting output as a saved dataset:

In [10]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",
    ]
)

store.create_saved_dataset(
    from_=job,
    name='my_training_ds',
    storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
)



<SavedDataset(name = my_training_ds, features = ['trip_stats:total_miles_travelled', 'trip_stats:total_trip_seconds', 'trip_stats:total_earned', 'trip_stats:trip_count', 'on_demand_stats:avg_fare', 'on_demand_stats:avg_trip_seconds', 'on_demand_stats:avg_speed', 'on_demand_stats:earned_per_hour'], join_keys = ['taxi_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x7fb520be35b0>, full_feature_names = False, tags = {}, feature_service_name = None, _retrieval_job = <feast.infra.offline_stores.file.FileRetrievalJob object at 0x7fb53059f7f0>, min_event_timestamp = 2019-06-01 00:00:00, max_event_timestamp = 2019-07-01 00:00:00, created_timestamp = 2022-06-24 13:05:23.270938, last_updated_timestamp = 2022-06-24 13:05:23.270938)>

In [11]:
# Display resulting table from the on-demand features
trips_table_with_on_demand_feats = job.to_df()
display(trips_table_with_on_demand_feats)

Unnamed: 0,taxi_id,event_timestamp,total_miles_travelled,total_trip_seconds,total_earned,trip_count,avg_fare,avg_speed,avg_trip_seconds,earned_per_hour
41641152,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-01 00:00:00+00:00,69.50,16080,203.50,8,25.437500,15.559701,2010.000000,45.559701
34157776,4f9128df57e0c64c1e98f9bfa053b2b01d5d3c21833371...,2019-06-01 00:00:00+00:00,75.20,14700,231.50,11,21.045455,18.416327,1336.363636,56.693878
34147094,3b8ea67bd1771560c29ac478e78e61a7cb9c3eea351b43...,2019-06-01 00:00:00+00:00,26.10,5280,72.25,2,36.125000,17.795455,2640.000000,49.261364
23784708,d3966c10c10e63be58eee01d3a6637d69f227f686ab885...,2019-06-01 00:00:00+00:00,124.80,23880,378.75,20,18.937500,18.814070,1194.000000,57.097990
46562308,d5c183f45a01c86c3cb0457bbd209c012db32a36a29c51...,2019-06-01 00:00:00+00:00,93.81,22584,315.50,20,15.775000,14.953773,1129.200000,50.292242
...,...,...,...,...,...,...,...,...,...,...
33930298,ac488e1f03251055f4b4eba3ca51de23709ac4182721e0...,2019-07-01 00:00:00+00:00,21.04,5935,90.25,9,10.027778,12.762258,659.444444,54.743050
19882826,5c5ebf6ea48279cddb3c608433c1e83d31bc1e699b6768...,2019-07-01 00:00:00+00:00,13.17,4408,63.50,8,7.937500,10.755898,551.000000,51.860254
2196191,17e5ec0902050b4cb63ab2f7b82cd36b81c289767b4f4f...,2019-07-01 00:00:00+00:00,27.00,7650,122.25,14,8.732143,12.705882,546.428571,57.529412
33910777,7dc01f4be54a4058ffb81098be25f52c9f1249afc88e3e...,2019-07-01 00:00:00+00:00,55.30,10020,178.50,11,16.227273,19.868263,910.909091,64.131737


In [12]:
# Display features for a specific taxi_id
trips_table_with_on_demand_feats[trips_table_with_on_demand_feats.taxi_id == id].sort_values(by="event_timestamp", ascending=False)

Unnamed: 0,taxi_id,event_timestamp,total_miles_travelled,total_trip_seconds,total_earned,trip_count,avg_fare,avg_speed,avg_trip_seconds,earned_per_hour
41649284,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-28 00:00:00+00:00,7.6,1260,25.25,2,12.625,21.714286,630.0,72.142857
41648979,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-27 00:00:00+00:00,7.6,1260,25.25,2,12.625,21.714286,630.0,72.142857
41648654,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-26 00:00:00+00:00,84.5,19140,334.75,33,10.143939,15.893417,580.0,62.962382
41648639,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-25 00:00:00+00:00,30.1,11340,166.0,21,7.904762,9.555556,540.0,52.698413
41647893,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-23 00:00:00+00:00,31.4,6060,113.0,10,11.3,18.653465,606.0,67.128713
41647588,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-22 00:00:00+00:00,31.4,6060,113.0,10,11.3,18.653465,606.0,67.128713
41647335,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-21 00:00:00+00:00,59.1,13020,199.0,13,15.307692,16.341014,1001.538462,55.023041
41647011,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-20 00:00:00+00:00,23.4,7080,107.5,12,8.958333,11.898305,590.0,54.661017
41646569,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-19 00:00:00+00:00,29.2,2760,80.5,4,20.125,38.086957,690.0,105.0
41646298,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,2019-06-18 00:00:00+00:00,7.8,1980,37.5,5,7.5,14.181818,396.0,68.181818


## 4. Developing dataset profiler

Dataset profiler is a function that accepts dataset and generates set of its characteristics. This charasteristics will be then used to evaluate (validate) next datasets.

**Important: datasets are not compared to each other! 
Feast use a reference dataset and a profiler function to generate a reference profile. 
This profile will be then used during validation of the tested dataset.**

In [13]:
import numpy as np

from feast.dqm.profilers.ge_profiler import ge_profiler
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

  __version_info__ = tuple(LooseVersion(__version__).version)


Loading saved dataset first and exploring the data:

In [14]:
ds = store.get_saved_dataset('my_training_ds')



Feast uses [Great Expectations](https://docs.greatexpectations.io/docs/) as a validation engine and [ExpectationSuite](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/core/expectation_suite/index.html#great_expectations.core.expectation_suite.ExpectationSuite) as a dataset's profile. Hence, we need to develop a function that will generate ExpectationSuite. This function will receive instance of [PandasDataset](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/dataset/index.html?highlight=pandasdataset#great_expectations.dataset.PandasDataset) (wrapper around pandas.DataFrame) so we can utilize both Pandas DataFrame API and some helper functions from PandasDataset during profiling.

In [15]:
DELTA = 0.1  # controlling allowed window in fraction of the value on scale [0, 1]

@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
    # simple checks on data consistency
    ds.expect_column_values_to_be_between(
        "avg_speed",
        min_value=0,
        max_value=60,
        mostly=0.99  # allow some outliers
    )
    
    ds.expect_column_values_to_be_between(
        "total_miles_travelled",
        min_value=0,
        max_value=500,
        mostly=0.99  # allow some outliers
    )
    
    # expectation of means based on observed values
    observed_mean = ds.trip_count.mean()
    ds.expect_column_mean_to_be_between("trip_count",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))
    
    observed_mean = ds.earned_per_hour.mean()
    ds.expect_column_mean_to_be_between("earned_per_hour",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))
    
    
    # expectation of quantiles
    qs = [0.5, 0.75, 0.9, 0.95]
    observed_quantiles = ds.avg_fare.quantile(qs)
    
    ds.expect_column_quantile_values_to_be_between(
        "avg_fare",
        quantile_ranges={
            "quantiles": qs,
            "value_ranges": [[None, max_value] for max_value in observed_quantiles]
        })                                     
    
    return ds.get_expectation_suite()

Testing our profiler function:

In [16]:
ds.get_profile(profiler=stats_profiler)

<GEProfile with expectations: [
  {
    "meta": {},
    "kwargs": {
      "column": "avg_speed",
      "min_value": 0,
      "max_value": 60,
      "mostly": 0.99
    },
    "expectation_type": "expect_column_values_to_be_between"
  },
  {
    "meta": {},
    "kwargs": {
      "column": "total_miles_travelled",
      "min_value": 0,
      "max_value": 500,
      "mostly": 0.99
    },
    "expectation_type": "expect_column_values_to_be_between"
  },
  {
    "meta": {},
    "kwargs": {
      "column": "trip_count",
      "min_value": 10.387244591346153,
      "max_value": 12.695521167200855
    },
    "expectation_type": "expect_column_mean_to_be_between"
  },
  {
    "meta": {},
    "kwargs": {
      "column": "earned_per_hour",
      "min_value": 52.32062497564023,
      "max_value": 63.9474305257825
    },
    "expectation_type": "expect_column_mean_to_be_between"
  },
  {
    "meta": {},
    "kwargs": {
      "column": "avg_fare",
      "quantile_ranges": {
        "quantiles": [
   

**Verify that all expectations that we coded in our profiler are present here. Otherwise (if you can't find some expectations) it means that it failed to pass on the reference dataset (do it silently is default behavior of Great Expectations).**

Now we can create validation reference from dataset and profiler function:

In [17]:
validation_reference = ds.as_reference(profiler=stats_profiler)

and test it against our existing retrieval job

In [18]:
_ = job.to_df(validation_reference=validation_reference)



Validation successfully passed as no exception were raised.

### 5. Validating new historical retrieval 

Creating new timestamps for Dec 2020:

In [19]:
from feast.dqm.errors import ValidationFailed

In [20]:
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')

In [21]:
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-05
...,...,...
35443,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-03
35444,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-04
35445,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-05
35446,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-06


In [22]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",
    ]
)

Execute retrieval job with validation reference:

In [23]:
try:
    df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
    print(exc.validation_report)



[
  {
    "result": {
      "observed_value": 6.692920555429092,
      "element_count": 4393,
      "missing_count": null,
      "missing_percent": null
    },
    "expectation_config": {
      "meta": {},
      "kwargs": {
        "column": "trip_count",
        "min_value": 10.387244591346153,
        "max_value": 12.695521167200855,
        "result_format": "COMPLETE"
      },
      "expectation_type": "expect_column_mean_to_be_between"
    },
    "meta": {},
    "exception_info": {
      "raised_exception": false,
      "exception_message": null,
      "exception_traceback": null
    },
    "success": false
  },
  {
    "result": {
      "observed_value": 68.99268345164135,
      "element_count": 4393,
      "missing_count": null,
      "missing_percent": null
    },
    "expectation_config": {
      "meta": {},
      "kwargs": {
        "column": "earned_per_hour",
        "min_value": 52.32062497564023,
        "max_value": 63.9474305257825,
        "result_format": "COMPLETE"
  

Validation failed since several expectations didn't pass:
* Trip count (mean) decreased more than 10% (which is expected when comparing Dec 2020 vs June 2019)
* Average Fare increased - all quantiles are higher than expected
* Earn per hour (mean) increased more than 10% (most probably due to increased fare)