In [1]:
# Importing dependencies
import pandas as pd
from feast import FeatureStore
from feast.dqm.profilers.ge_profiler import ge_profiler
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

In [2]:
# Getting our feature store
store = FeatureStore(repo_path="feature_repo/")

# Getting a saved dataset
dataset = store.get_saved_dataset(name="driver_stats")

In [3]:
# Tolerance value for the mean
DELTA = 0.1

# Creating a profiler function
@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
    # DEFINING MINIMUM AND MAXIMUM
    # EXPECTED VALUES

    # Getting min and max values for avg_daily_trips
    observed_min = ds["avg_daily_trips"].min()
    observed_max = ds["avg_daily_trips"].max()

    # Setting the expected min and max values
    ds.expect_column_values_to_be_between(
        column="avg_daily_trips",
        mostly=0.99,
        min_value=observed_min,
        max_value=observed_max       
    )

    # DEFINING EXPECTED AVERAGE

    # Getting the average of avg_daily_trips
    observed_mean = ds["avg_daily_trips"].mean()
    
    # Setting the expected range
    ds.expect_column_mean_to_be_between(
        column="avg_daily_trips",        
        min_value=observed_mean * (1 - DELTA),
        max_value=observed_mean * (1 + DELTA)
    )

    # Retrieving comparison results
    return ds.get_expectation_suite(discard_failed_expectations=False)

In [4]:
# Checking the expectation function
dataset.get_profile(profiler=stats_profiler)

<GEProfile with expectations: [
  {
    "expectation_type": "expect_column_values_to_be_between",
    "meta": {},
    "kwargs": {
      "column": "avg_daily_trips",
      "mostly": 0.99,
      "min_value": 2,
      "max_value": 998
    }
  },
  {
    "expectation_type": "expect_column_mean_to_be_between",
    "meta": {},
    "kwargs": {
      "column": "avg_daily_trips",
      "min_value": 435.62050632911394,
      "max_value": 532.4250632911393
    }
  }
]>

In [6]:
# Saving the dataset as a reference for validation
validation_reference = dataset.as_reference("driver_stats_validation_reference",profiler=stats_profiler)

In [7]:
# Creating an entity DataFrame with timestamps
timestamps = pd.date_range(
    start="2021-09-05",    
    end="2021-09-06",     
    freq='H').to_frame(name="event_timestamp", index=False)

# Creating patient IDs for the entity DataFrame
driver_ids = pd.DataFrame(data=[1001, 1002, 1003, 1004, 1005], 
                          columns=["driver_id"])

# Creating the cartesian product of our timestamps and entities 
entity_df = timestamps.merge(right=driver_ids, 
                             how="cross")

# Getting the indicated historical features
# and joining them with our entity DataFrame
historical_features = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_stats_fv:conv_rate",
        "driver_stats_fv:acc_rate",
        "driver_stats_fv:avg_daily_trips",
    ]
)

In [8]:
# Converting the RetrievalJob to a DataFrame and validating it against our reference dataset
_ = historical_features.to_df(validation_reference=validation_reference)

ValidationFailed: [
  {
    "exception_info": {
      "raised_exception": false,
      "exception_message": null,
      "exception_traceback": null
    },
    "meta": {},
    "expectation_config": {
      "expectation_type": "expect_column_values_to_be_between",
      "meta": {},
      "kwargs": {
        "column": "avg_daily_trips",
        "mostly": 0.99,
        "min_value": 2,
        "max_value": 998,
        "result_format": "COMPLETE"
      }
    },
    "result": {
      "element_count": 125,
      "missing_count": 0,
      "missing_percent": 0.0,
      "unexpected_count": 2,
      "unexpected_percent": 1.6,
      "unexpected_percent_total": 1.6,
      "unexpected_percent_nonmissing": 1.6,
      "partial_unexpected_list": [
        0,
        1
      ],
      "partial_unexpected_index_list": [
        60,
        71
      ],
      "partial_unexpected_counts": [
        {
          "value": 0,
          "count": 1
        },
        {
          "value": 1,
          "count": 1
        }
      ],
      "unexpected_list": [
        0,
        1
      ],
      "unexpected_index_list": [
        60,
        71
      ]
    },
    "success": false
  }
]

In [9]:
# Creating an entity DataFrame with timestamps
timestamps = pd.date_range(
    start="2021-09-05",    
    end="2021-09-15",     
    freq='H').to_frame(name="event_timestamp", index=False)

# Creating the cartesian product of our timestamps and entities 
entity_df = timestamps.merge(right=driver_ids, 
                             how="cross")

# Getting the indicated historical features
# and joining them with our entity DataFrame
historical_features = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_stats_fv:conv_rate",
        "driver_stats_fv:acc_rate",
        "driver_stats_fv:avg_daily_trips",
    ]
)

In [10]:
# Converting the RetrievalJob to a DataFrame and validating it against our reference dataset
_ = historical_features.to_df(validation_reference=validation_reference)

In [11]:
# Getting a saved dataset
dataset_1001 = store.get_saved_dataset(name='driver_stats_1001')

# Checking the expectation function
dataset_1001.get_profile(profiler=stats_profiler)

<GEProfile with expectations: [
  {
    "expectation_type": "expect_column_values_to_be_between",
    "meta": {},
    "kwargs": {
      "column": "avg_daily_trips",
      "mostly": 0.99,
      "min_value": 11,
      "max_value": 982
    }
  },
  {
    "expectation_type": "expect_column_mean_to_be_between",
    "meta": {},
    "kwargs": {
      "column": "avg_daily_trips",
      "min_value": 401.83291139240504,
      "max_value": 491.12911392405067
    }
  }
]>

In [12]:
# Creating an entity DataFrame with timestamps
timestamps = pd.date_range(
    start="2021-09-05",    
    end="2021-09-15",     
    freq='H').to_frame(name="event_timestamp", index=False)

# Creating patient IDs for the entity DataFrame
driver_id = pd.DataFrame([1001], columns=["driver_id"])

# Creating the cartesian product of our timestamps and entities 
entity_df = timestamps.merge(right=driver_id, how="cross")

# Getting the indicated historical features
# and joining them with our entity DataFrame
historical_features = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_stats_fv:conv_rate",
        "driver_stats_fv:acc_rate",
        "driver_stats_fv:avg_daily_trips",
    ]
)

In [13]:
# Converting the RetrievalJob to a DataFrame and validating it against our reference dataset
_ = historical_features.to_df(validation_reference=validation_reference)