# Minimal Ride Hailing Example

![Feast Data Flow](https://raw.githubusercontent.com/feast-dev/feast/master/examples/minimal/images/data-flow.png)

## Introduction

For this quick start, we will:
1. Register two driver features, one for driver statistics, the other for driver trips. Driver statistics are updated on daily basis, whereas driver trips are updated in real time.
2. Creates a driver dataset, then use Feast SDK to retrieve the features corresponding to these drivers from an offline store.
3. Store the features in an online store (Redis), and retrieve the features via Feast SDK.

## Features Registry (Feast Core)

### Configuration

Configurations can be provided in three different methods:

In [None]:
# Using environmental variables
# os.environ["FEAST_CORE_URL"] = "core:6565"
# os.environ["FEAST_SERVING_URL"] = "online_serving:6566"

# Provide a map during client initialization
# options = {
#     "CORE_URL": "core:6565",
#     "SERVING_URL": "online_serving:6566", 
# }
# client = Client(options)

# As keyword arguments, without the `FEAST` prefix
# client = Client(core_url="core:6565", serving_url="online_serving:6566")

If you are following the quick start guide, all required configurations to follow the remainder of the tutorial should have been setup, in the form of environmental variables, as showned below. The configuration values may differ depending on the environment. For a full list of configurable values and explanation, please refer to the user guide.

In [2]:
import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

{'FEAST_CORE_URL': 'feast-release-feast-core:6565',
 'FEAST_HISTORICAL_SERVING_URL': 'feast-release-feast-batch-serving:6566',
 'FEAST_RELEASE_FEAST_CORE_PORT': 'tcp://10.79.249.237:80',
 'FEAST_RELEASE_FEAST_CORE_PORT_6565_TCP': 'tcp://10.79.249.237:6565',
 'FEAST_RELEASE_FEAST_CORE_PORT_6565_TCP_ADDR': '10.79.249.237',
 'FEAST_RELEASE_FEAST_CORE_PORT_6565_TCP_PORT': '6565',
 'FEAST_RELEASE_FEAST_CORE_PORT_6565_TCP_PROTO': 'tcp',
 'FEAST_RELEASE_FEAST_CORE_PORT_80_TCP': 'tcp://10.79.249.237:80',
 'FEAST_RELEASE_FEAST_CORE_PORT_80_TCP_ADDR': '10.79.249.237',
 'FEAST_RELEASE_FEAST_CORE_PORT_80_TCP_PORT': '80',
 'FEAST_RELEASE_FEAST_CORE_PORT_80_TCP_PROTO': 'tcp',
 'FEAST_RELEASE_FEAST_CORE_SERVICE_HOST': '10.79.249.237',
 'FEAST_RELEASE_FEAST_CORE_SERVICE_PORT': '80',
 'FEAST_RELEASE_FEAST_CORE_SERVICE_PORT_GRPC': '6565',
 'FEAST_RELEASE_FEAST_CORE_SERVICE_PORT_HTTP': '80',
 'FEAST_RELEASE_FEAST_JOBSERVICE_PORT': 'tcp://10.79.243.227:80',
 'FEAST_RELEASE_FEAST_JOBSERVICE_PORT_6568_TCP':

### Basic Imports and Feast Client initialization

In [3]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [4]:
#client = Client()
client = Client(redis_host=os.environ["FEAST_RELEASE_REDIS_MASTER_SERVICE_HOST"])

  and should_run_async(code)


### Declare Features and Entities

Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. 

In [5]:
driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64)

In [6]:
# Daily updated features 
acc_rate = Feature("acc_rate", ValueType.FLOAT)
conv_rate = Feature("conv_rate", ValueType.FLOAT)
avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32)

# Real-time updated features
trips_today = Feature("trips_today", ValueType.INT32)

```python
FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ]
    ...
)
```


```python
FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ]
    ...
)

```

![Features Join](./images/features-join.png)

```python
FeatureTable(
    ...,
    batch_source=FileSource(  # Required
        file_format=ParquetFormat(),
        file_url="gs://feast-demo-data-lake",
        ...
    ),
    stream_source=KafkaSource(  # Optional
        bootstrap_servers="...",
        topic="driver_trips",
        ...
    )
```

Feature tables group the features together and describe how they can be retrieved. The following examples assume that the feature tables are stored on the local file system, and is accessible from the Spark cluster. If you have setup a GCP service account, you may use GCS instead as the file source.

`batch_source` defines where the historical features are stored. It is also possible to have an optional `stream_source`, which the feature values are delivered continuously.

For now we will define only `batch_source` for both `driver_statistics` and `driver_trips`, and demonstrate the usage of `stream_source` in later part of the tutorial.

In [7]:
# This is the location we're using for the offline feature store.

import os
demo_data_location = os.path.join(os.getenv("FEAST_SPARK_STAGING_LOCATION", "file:///home/jovyan/"), "test_data")


In [8]:
driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics")

driver_statistics = FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_statistics_source_uri,
        date_partition_column="date"
    )
)

In [9]:
driver_trips_source_uri = os.path.join(demo_data_location, "driver_trips")


driver_trips = FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_trips_source_uri,
        date_partition_column="date"
    )
)

### Registering entities and feature tables in Feast Core

In [10]:
client.apply(driver_id)
client.apply(driver_statistics)
client.apply(driver_trips)

In [11]:
print(client.get_feature_table("driver_statistics").to_yaml())
print(client.get_feature_table("driver_trips").to_yaml())

spec:
  name: driver_statistics
  entities:
  - driver_id
  features:
  - name: acc_rate
    valueType: FLOAT
  - name: conv_rate
    valueType: FLOAT
  - name: avg_daily_trips
    valueType: INT32
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///home/jovyan/test_data/driver_statistics
meta:
  createdTimestamp: '2021-03-07T22:21:45Z'

spec:
  name: driver_trips
  entities:
  - driver_id
  features:
  - name: trips_today
    valueType: INT32
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///home/jovyan/test_data/driver_trips
meta:
  createdTimestamp: '2021-03-07T22:21:45Z'



### Populating batch source

Feast is agnostic to how the batch source is populated, as long as it complies to the Feature Table specification. Therefore, any existing ETL tools can be used for the purpose of data ingestion. Alternatively, you can also use Feast SDK to ingest a Panda Dataframe to the batch source.

In [12]:
import pandas as pd
import numpy as np
from datetime import datetime

In [13]:
def generate_entities():
    return np.random.choice(999999, size=100, replace=False)

In [14]:
def generate_trips(entities):
    df = pd.DataFrame(columns=["driver_id", "trips_today", "datetime", "created"])
    df['driver_id'] = entities
    df['trips_today'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df
    

In [15]:
def generate_stats(entities):
    df = pd.DataFrame(columns=["driver_id", "conv_rate", "acc_rate", "avg_daily_trips", "datetime", "created"])
    df['driver_id'] = entities
    df['conv_rate'] = np.random.random(size=100).astype(np.float32)
    df['acc_rate'] = np.random.random(size=100).astype(np.float32)
    df['avg_daily_trips'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df

In [16]:
import time
time.sleep(5)
entities = generate_entities()
stats_df = generate_stats(entities)
trips_df = generate_trips(entities)

In [17]:
time.sleep(5)
client.ingest(driver_statistics, stats_df)
client.ingest(driver_trips, trips_df)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.
Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


## Historical Retrieval For Training

### Point-in-time correction

![Point In Time](https://raw.githubusercontent.com/feast-dev/feast/master/examples/minimal/images/pit-2.png)

Feast joins the features to the entities based on the following conditions:
1. Entity primary key(s) value matches.
2. Feature event timestamp is the closest match possible to the entity event timestamp,
   but must not be more recent than the entity event timestamp, and the difference must
   not be greater than the maximum age specified in the feature table, unless the maximum age is not specified.
3. If more than one feature table rows satisfy condition 1 and 2, feature row with the
   most recent created timestamp will be chosen.
4. If none of the above conditions are satisfied, the feature rows will have null values.

In [18]:
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [19]:
def read_parquet(uri):
    parsed_uri = urlparse(uri)
    if parsed_uri.scheme == "file":
        return pd.read_parquet(parsed_uri.path)
    elif parsed_uri.scheme == "gs":
        fs = gcsfs.GCSFileSystem()
        files = ["gs://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 's3':
        import s3fs
        fs = s3fs.S3FileSystem()
        files = ["s3://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 'wasbs':
        import adlfs
        fs = adlfs.AzureBlobFileSystem(
            account_name=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_NAME'), account_key=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY')
        )
        uripath = parsed_uri.username + parsed_uri.path
        files = fs.glob(uripath + '/part-*')
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    else:
        raise ValueError(f"Unsupported URL scheme {uri}")

In [20]:
time.sleep(10)
entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp'])
entities_with_timestamp['driver_id'] = np.random.choice(entities, 10, replace=False)
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2020, 10, 18).timestamp(),
    datetime(2020, 10, 20).timestamp(),
    size=10), unit='s')
entities_with_timestamp

Unnamed: 0,driver_id,event_timestamp
0,471125,2020-10-19 20:01:39
1,369032,2020-10-19 20:10:30
2,377665,2020-10-18 15:12:26
3,423368,2020-10-18 13:31:29
4,157514,2020-10-18 19:46:46
5,663063,2020-10-19 22:39:19
6,673981,2020-10-18 14:12:48
7,722971,2020-10-18 11:12:53
8,476369,2020-10-18 19:23:05
9,298148,2020-10-18 15:49:03


In [21]:
# get_historical_features will return immediately once the Spark job has been submitted succesfully.
job = client.get_historical_features(
    feature_refs=[
        "driver_statistics:avg_daily_trips",
        "driver_statistics:conv_rate",
        "driver_statistics:acc_rate",
        "driver_trips:trips_today"
    ], 
    entity_source=entities_with_timestamp
)

  and should_run_async(code)


In [22]:
# get_output_file_uri will block until the Spark job is completed.
output_file_uri = job.get_output_file_uri()

In [23]:
time.sleep(10)
read_parquet(output_file_uri)

Unnamed: 0,driver_id,event_timestamp,driver_statistics__acc_rate,driver_statistics__conv_rate,driver_statistics__avg_daily_trips,driver_trips__trips_today
0,471125,2020-10-19 20:01:39,0.474747,0.708197,111,324.0
1,423368,2020-10-18 13:31:29,0.582376,0.073656,636,894.0
2,673981,2020-10-18 14:12:48,0.546367,0.896934,757,525.0
3,722971,2020-10-18 11:12:53,0.183651,0.819915,158,
4,476369,2020-10-18 19:23:05,0.176206,0.667049,35,309.0
5,298148,2020-10-18 15:49:03,0.453658,0.965575,72,779.0
6,377665,2020-10-18 15:12:26,0.06929,0.098833,918,635.0
7,369032,2020-10-19 20:10:30,0.268349,0.876938,803,781.0
8,663063,2020-10-19 22:39:19,0.814907,0.059869,338,965.0
9,157514,2020-10-18 19:46:46,0.027994,0.31103,650,112.0


The retrieved result can now be used for model training.

## Populating Online Storage with Batch Ingestion

In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store.

In [24]:
time.sleep(10)
job = client.start_offline_to_online_ingestion(
    driver_statistics,
    datetime(2020, 10, 10),
    datetime(2020, 10, 20)
)

  and should_run_async(code)


In [25]:
# It will take some time before the Spark Job is completed
time.sleep(15)
job.get_status()

<SparkJobStatus.IN_PROGRESS: 1>

Once the job is completed, the SDK can be used to retrieve the result from the online store.

In [26]:
time.sleep(15)
entities_sample = np.random.choice(entities, 10, replace=False)
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

[{'driver_id': 884932},
 {'driver_id': 397135},
 {'driver_id': 471125},
 {'driver_id': 477225},
 {'driver_id': 395171},
 {'driver_id': 637490},
 {'driver_id': 574137},
 {'driver_id': 660456},
 {'driver_id': 616173},
 {'driver_id': 776855}]

In [27]:
time.sleep(15)
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips"],
    entity_rows=entities_sample).to_dict()
features

{'driver_id': [884932,
  397135,
  471125,
  477225,
  395171,
  637490,
  574137,
  660456,
  616173,
  776855],
 'driver_statistics:avg_daily_trips': [473,
  90,
  111,
  9,
  670,
  593,
  849,
  566,
  874,
  977]}

In [28]:
time.sleep(15)
pd.DataFrame(features)

Unnamed: 0,driver_id,driver_statistics:avg_daily_trips
0,884932,473
1,397135,90
2,471125,111
3,477225,9
4,395171,670
5,637490,593
6,574137,849
7,660456,566
8,616173,874
9,776855,977


The features can now be used as an input to the trained model.

## Ingestion from Streaming (real-time) Source

With a streaming source, we can use Feast SDK to launch a Spark streaming job that continuously update the online store. First, we will update `driver_trips` feature table such that a new streaming source is added.

In [29]:
!pip install confluent_kafka

  and should_run_async(code)




In [30]:
import os
from pprint import pprint
import time
import json
import pytz
import io
import avro.schema
import pandas as pd
import numpy as np
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse
from datetime import datetime
from avro.io import BinaryEncoder, DatumWriter
from confluent_kafka import Producer
from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat


In [31]:
# Change this to any Kafka broker addresses which is accessible by the spark cluster
time.sleep(15)
KAFKA_BROKER = os.getenv("DEMO_KAFKA_BROKERS", os.environ["FEAST_RELEASE_KAFKA_PORT_9092_TCP"])
print('starting kafka brokers')
print(KAFKA_BROKER)

starting kafka brokers
tcp://10.79.242.9:9092


In [32]:
avro_schema_json = json.dumps({
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
            "name": "datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"},
        },
    ],
})

In [33]:
time.sleep(30)
driver_trips.stream_source = KafkaSource(
    event_timestamp_column="datetime",
    created_timestamp_column="datetime",
    bootstrap_servers=KAFKA_BROKER,
    topic="driver_trips",
    message_format=AvroFormat(avro_schema_json)
)
print('apply driver_trips')
client.apply(driver_trips)

apply driver_trips


Start the streaming job and send avro record to Kafka:

In [34]:
time.sleep(30)
print('start_stream_to_online_ingestion')
job = client.start_stream_to_online_ingestion(
    driver_trips
)

start_stream_to_online_ingestion


In [35]:
def send_avro_record_to_kafka(topic, record):
    value_schema = avro.schema.parse(avro_schema_json)
    writer = DatumWriter(value_schema)
    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer.write(record, encoder)
    
    producer = Producer({
        "bootstrap.servers": KAFKA_BROKER,
    })
    producer.produce(topic=topic, value=bytes_writer.getvalue())
    producer.flush()

In [None]:
# Note: depending on the Kafka configuration you may need to create the Kafka topic first, like below:
print('setup kafka config')
# from confluent_kafka.admin import AdminClient, NewTopic
# admin = AdminClient({'bootstrap.servers': KAFKA_BROKER})
# new_topic = NewTopic('driver_trips', num_partitions=1, replication_factor=3)
# admin.create_topics(new_topic)
time.sleep(15)
for record in trips_df.drop(columns=['created']).to_dict('record'):
    print('sending a record')
    time.sleep(1)
    record["datetime"] = (
        record["datetime"].to_pydatetime().replace(tzinfo=pytz.utc)
    )

    send_avro_record_to_kafka(topic="driver_trips", record=record)

setup kafka config
sending a record
sending a record
sending a record
sending a record
sending a record
sending a record
sending a record
sending a record
sending a record
sending a record
sending a record


### Retrieving joined features from several feature tables

In [None]:
time.sleep(15)
entities_sample = np.random.choice(entities, 10, replace=False)
print('retrieve features entities_sample')
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

In [None]:
print('retrieve features')
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips", "driver_trips:trips_today"],
    entity_rows=entities_sample).to_dict()

In [None]:
time.sleep(5)
print('print all features')
pd.DataFrame(features)

In [None]:
# This will stop the streaming job
job.cancel()