![ga4](https://www.google-analytics.com/collect?v=2&tid=G-6VDTYWLKX6&cid=1&en=page_view&sid=1&dl=statmike%2Fvertex-ai-mlops&dt=11+-+Vertex+AI+%3E+Features+-+Feature+Store.ipynb)

# 11 - Vertex AI > Features - Feature Store

This is a demonstration of [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore/overview). A feature store is a central repository for organizing, storing, and retrieving features.  This is a fully managed service that scales the underlying compute and storage resources.  The feature store becomes a central location for serving features for training and prediction with low-latency. It stores feature values at points-in-time:

-  Point-in-time lookups for retrieving features for model training. Retrieve feature values prior to a prediction to prevent data leakage.
-  Manage training-serving skew

### Prerequisites:
-  01 - BigQuery - Table Data Source
-  Any of 02-05 That Deploy A Model To An Endpoint
   -  Used to demonstrate online predictions with feature store serving features

### Resources:
-  [Python Client for Vertex AI](https://googleapis.dev/python/aiplatform/latest/aiplatform.html)
   -  Currently using the [v1beta1 services](https://googleapis.dev/python/aiplatform/latest/aiplatform_v1beta1/services.html)
-  [Feature Store Overview](https://cloud.google.com/vertex-ai/docs/featurestore/overview)
-  [Data Model and Concepts](https://cloud.google.com/vertex-ai/docs/featurestore/concepts)
-  [Best Practices](https://cloud.google.com/vertex-ai/docs/featurestore/best-practices) including info on composite entity types

### Conceptual Flow & Workflow
<p align="center">
  <img alt="Conceptual Flow" src="./architectures/slides/11_arch.png" width="45%">
&nbsp; &nbsp; &nbsp; &nbsp;
  <img alt="Workflow" src="./architectures/slides/11_console.png" width="45%">
</p>

---
## Setup

inputs:

In [12]:
REGION = 'us-central1'
PROJECT_ID='statmike-demo2'
DATANAME = 'fraud'
NOTEBOOK = '11'

ENTITYTYPE_ID = 'transaction'

# Model Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

packages:

In [13]:
#from google.cloud.aiplatform_v1beta1 import (FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient, types)
from google.cloud import aiplatform

from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.field_mask_pb2 import FieldMask

from google.cloud import bigquery
#from google.cloud.aiplatform_v1beta1 import (PredictionServiceClient, EndpointServiceClient)
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

clients:

In [14]:
client_options = {"api_endpoint": f"{REGION}-aiplatform.googleapis.com"}

clients = {}
clients['fs'] = aiplatform.gapic.FeaturestoreServiceClient(client_options = client_options)
clients['fs_olserve'] = aiplatform.gapic.FeaturestoreOnlineServingServiceClient(client_options = client_options)

clients['bq'] = bigquery.Client()

aiplatform.init(project=PROJECT_ID, location=REGION)

parameters:

In [15]:
PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"
DIR = f"temp/{NOTEBOOK}"

environment:

In [16]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Feature Store Data model
Feature Store organizes data with the following 3 important hierarchical concepts:

Featurestore -> EntityType -> Feature

- **Featurestore**: the place to store your features
    - **EntityType**: under a Featurestore, an EntityType describes an object to be modeled, real one or virtual one.
        - **Feature**: under an EntityType, a feature describes an attribute of the EntityType

For the digits data used in these examples, the feature store will be called digits_featurestore.  The store has 1 entity type: images.  The features will be the pixels and the target values.

---
## Create Feature Store

In [17]:
FEATURESTORE_ID = DATANAME

In [19]:
featurestore_lro = clients['fs'].create_featurestore(
    request = aiplatform.gapic.CreateFeaturestoreRequest(
        parent = PARENT,
        featurestore_id = FEATURESTORE_ID,
        featurestore = aiplatform.gapic.Featurestore(
            #description/display_name = f"Notebook {NOTEBOOK} demonstration of Vertex AI Features (feature store) using {DATANAME} data",
            labels = {'notebook':f'{NOTEBOOK}'},
            online_serving_config = aiplatform.gapic.Featurestore.OnlineServingConfig(
                fixed_node_count = 2
            ),
        ),
    )
)

In [20]:
featurestore_lro.result()

name: "projects/764015827198/locations/us-central1/featurestores/fraud"

Use `get_featurestore` to see details of specified feature store:

In [21]:
clients['fs'].get_featurestore(name=clients['fs'].featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID))

name: "projects/764015827198/locations/us-central1/featurestores/fraud"
create_time {
  seconds: 1638982792
  nanos: 491139000
}
update_time {
  seconds: 1638982792
  nanos: 585025000
}
etag: "AMEw9yNxcAN7A5S1IctiEi4ldC9nDVUCM-STwZ8Yw8ZhzGohPCV0ITJdTcV7copy_NJ5"
labels {
  key: "notebook"
  value: "07"
}
online_serving_config {
  fixed_node_count: 2
}
state: STABLE

Use `list_featurestores` to see details of all feature stores:

In [22]:
clients['fs'].list_featurestores(parent=PARENT)

ListFeaturestoresPager<featurestores {
  name: "projects/764015827198/locations/us-central1/featurestores/fraud"
  create_time {
    seconds: 1638982792
    nanos: 491139000
  }
  update_time {
    seconds: 1638982792
    nanos: 585025000
  }
  etag: "AMEw9yNpYLNnqe_d4QsDtt-CjUyVQGPXI8FdzC5iIlBHYu3Yb29KZJ2iePvEJlC0sjpx"
  labels {
    key: "notebook"
    value: "07"
  }
  online_serving_config {
    fixed_node_count: 2
  }
  state: STABLE
}
>

---
## Create Entity Type

In [26]:
entitytype_lro = clients['fs'].create_entity_type(
    request = aiplatform.gapic.CreateEntityTypeRequest(
        parent = clients['fs'].featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
        entity_type_id = ENTITYTYPE_ID,
        entity_type = aiplatform.gapic.EntityType(
            description = f"Entity: {ENTITYTYPE_ID}, for data: {DATANAME}"
        ),
    )
)

In [27]:
entitytype_lro.result()

name: "projects/764015827198/locations/us-central1/featurestores/fraud/entityTypes/transaction"

Use `list_entity_types` to see details of all entity types:

In [28]:
clients['fs'].list_entity_types(parent = f"{PARENT}/featurestores/{FEATURESTORE_ID}")

ListEntityTypesPager<entity_types {
  name: "projects/764015827198/locations/us-central1/featurestores/fraud/entityTypes/transaction"
  description: "Entity: transaction, for data: fraud"
  create_time {
    seconds: 1638988806
    nanos: 949907000
  }
  update_time {
    seconds: 1638988806
    nanos: 949907000
  }
  etag: "AMEw9yN5AepXX82rfwPCQKeYhfvAKa7nNCGwOnDMFILFTJ1JuUVj5KcaiHocYcFJn-Tv"
}
>

---
## Create Features

Get the schema of the data source for new features:

In [29]:
schema = clients['bq'].query(query = f"SELECT * FROM {DATANAME}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{DATANAME}_prepped'").to_dataframe()

In [30]:
schema

Unnamed: 0,table_catalog,table_schema,table_name,column_name,ordinal_position,is_nullable,data_type,is_generated,generation_expression,is_stored,is_hidden,is_updatable,is_system_defined,is_partitioning_column,clustering_ordinal_position
0,statmike-demo2,fraud,fraud_prepped,Time,1,YES,INT64,NEVER,,,NO,,NO,NO,
1,statmike-demo2,fraud,fraud_prepped,V1,2,YES,FLOAT64,NEVER,,,NO,,NO,NO,
2,statmike-demo2,fraud,fraud_prepped,V2,3,YES,FLOAT64,NEVER,,,NO,,NO,NO,
3,statmike-demo2,fraud,fraud_prepped,V3,4,YES,FLOAT64,NEVER,,,NO,,NO,NO,
4,statmike-demo2,fraud,fraud_prepped,V4,5,YES,FLOAT64,NEVER,,,NO,,NO,NO,
5,statmike-demo2,fraud,fraud_prepped,V5,6,YES,FLOAT64,NEVER,,,NO,,NO,NO,
6,statmike-demo2,fraud,fraud_prepped,V6,7,YES,FLOAT64,NEVER,,,NO,,NO,NO,
7,statmike-demo2,fraud,fraud_prepped,V7,8,YES,FLOAT64,NEVER,,,NO,,NO,NO,
8,statmike-demo2,fraud,fraud_prepped,V8,9,YES,FLOAT64,NEVER,,,NO,,NO,NO,
9,statmike-demo2,fraud,fraud_prepped,V9,10,YES,FLOAT64,NEVER,,,NO,,NO,NO,


Prepare a request for `batch_create_features`:
- specification for the features, data type and descriptions ....

In [31]:
REQUESTS = []
for i in range(schema.shape[0]):
    
    if schema['column_name'][i] in [VAR_TARGET, 'splits'] + VAR_OMIT.split():
        continue
    
    if schema['data_type'][i] == 'STRING': value_type = aiplatform.gapic.Feature.ValueType.STRING
    elif schema['data_type'][i] == 'INT64': value_type = aiplatform.gapic.Feature.ValueType.INT64
    elif schema['data_type'][i] == 'FLOAT64': value_type = aiplatform.gapic.Feature.ValueType.DOUBLE
    
    description = f"Column named {schema['column_name'][i]} from BQ Table {PROJECT_ID}.{DATANAME}.{DATANAME}_prepped"
    
    REQUESTS.append(
        aiplatform.gapic.CreateFeatureRequest(
            feature = aiplatform.gapic.Feature(
                value_type = value_type,
                description = description,
                # optional, monitoring_config here as override, otherwise it inherits from entity_type
            ),
            feature_id = schema['column_name'][i].lower(),
        )    
    )

In [32]:
batchfeatures = clients['fs'].batch_create_features(
    parent = clients['fs'].entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, ENTITYTYPE_ID),
    requests = REQUESTS,
)

In [62]:
#list(item.name for item in batchfeatures.result().features)

---
## Search Features
Search goes across all Feature Stores and Entity Types.

Also, use the list_features function to list all.

In [33]:
# return the first feature:
list(clients['fs'].search_features(location = PARENT))[0]

name: "projects/764015827198/locations/us-central1/featurestores/fraud/entityTypes/transaction/features/amount"
description: "Column named Amount from BQ Table statmike-demo2.fraud.fraud_prepped"
value_type: DOUBLE
create_time {
  seconds: 1638990157
  nanos: 457386000
}
update_time {
  seconds: 1638990157
  nanos: 457386000
}

In [35]:
# find all features with INT64 value type
list(clients['fs'].search_features(aiplatform.gapic.SearchFeaturesRequest(location = PARENT, query = "value_type=INT64")))

[name: "projects/764015827198/locations/us-central1/featurestores/fraud/entityTypes/transaction/features/time"
 description: "Column named Time from BQ Table statmike-demo2.fraud.fraud_prepped"
 value_type: INT64
 create_time {
   seconds: 1638990157
   nanos: 337811000
 }
 update_time {
   seconds: 1638990157
   nanos: 337811000
 }]

In [36]:
# find all features of the form V*9 with DOUBLE value type
list(clients['fs'].search_features(aiplatform.gapic.SearchFeaturesRequest(location = PARENT, query = "feature_id:V*9 AND value_type=DOUBLE")))

[name: "projects/764015827198/locations/us-central1/featurestores/fraud/entityTypes/transaction/features/v19"
 description: "Column named V19 from BQ Table statmike-demo2.fraud.fraud_prepped"
 value_type: DOUBLE
 create_time {
   seconds: 1638990157
   nanos: 381741000
 }
 update_time {
   seconds: 1638990157
   nanos: 381741000
 },
 name: "projects/764015827198/locations/us-central1/featurestores/fraud/entityTypes/transaction/features/v9"
 description: "Column named V9 from BQ Table statmike-demo2.fraud.fraud_prepped"
 value_type: DOUBLE
 create_time {
   seconds: 1638990157
   nanos: 355483000
 }
 update_time {
   seconds: 1638990157
   nanos: 355483000
 }]

---
## Import Feature Values
- BigQuery (THIS DEMO)
- Avro
- CSV

Prepare a source table with timestamp (update_time) and unique id's for each entity

In [37]:
query = f"""
CREATE OR REPLACE TABLE {PROJECT_ID}.{DATANAME}.{DATANAME}_featurestore_import AS
WITH A AS 
    (SELECT *, CAST(FLOOR(10*RAND()) AS INT64) day_offset
    FROM {PROJECT_ID}.{DATANAME}.{DATANAME}_prepped)
SELECT * EXCEPT(day_offset),
        DATE_SUB(CURRENT_TIMESTAMP, INTERVAL day_offset DAY) AS update_time
FROM A
"""
bqjob = clients['bq'].query(query = query)

In [38]:
bqjob.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7f11156b0110>

Create Feature specification for each feature in the input source:

In [39]:
FEATURE_SPECS = []
for i in range(schema.shape[0]):
    if schema['column_name'][i] in [VAR_TARGET, 'splits'] + VAR_OMIT.split():
        continue
    
    FEATURE_SPECS.append(
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(
            id = schema['column_name'][i].lower(),
            source_field = schema['column_name'][i]
        )
    )

In [40]:
import_request = aiplatform.gapic.ImportFeatureValuesRequest(
    entity_type = clients['fs'].entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, ENTITYTYPE_ID),
    bigquery_source = aiplatform.gapic.BigQuerySource(input_uri = f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_featurestore_import'),
    feature_time_field = "update_time",
    feature_time = Timestamp().GetCurrentTime(),
    entity_id_field = "transaction_id",
    feature_specs = FEATURE_SPECS,
    worker_count = 4,
)

In [41]:
importjob = clients['fs'].import_feature_values(request = import_request)

In [42]:
importjob.result()

imported_entity_count: 284807
imported_feature_value_count: 8544210

---
## Prediction with Feature Store for Serving Features

### Entity Id's
Retrieve a list of entity id's from the source BigQuery table.  These are in the column `transaction_id`.

In [43]:
unique_id = clients['bq'].query(query = f"SELECT * FROM {DATANAME}.{DATANAME}_prepped WHERE splits='TEST' LIMIT 10").to_dataframe()

In [44]:
unique_id['transaction_id'][0]

'07fdced0-3837-47a1-9526-64d74ad9b113'

### Data For Prediction: Single Entity Served by Vertex AI > Features (Feature Store)

In [47]:
feature_values = clients['fs_olserve'].read_feature_values(
    request = aiplatform.gapic.ReadFeatureValuesRequest(
        entity_type = clients['fs'].entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, ENTITYTYPE_ID),
        entity_id = unique_id['transaction_id'][0],
        feature_selector = aiplatform.gapic.FeatureSelector(id_matcher=aiplatform.gapic.IdMatcher(ids=['*'])),
    )
)

In [48]:
print(list(item.id for item in feature_values.header.feature_descriptors))

['v18', 'time', 'v26', 'v9', 'v3', 'v24', 'v8', 'v1', 'v4', 'v5', 'v14', 'v28', 'v27', 'v6', 'v16', 'v21', 'v7', 'v25', 'v22', 'v2', 'v15', 'v12', 'amount', 'v23', 'v13', 'v17', 'v19', 'v11', 'v10', 'v20']


In [49]:
print(list(item.value.double_value for item in feature_values.entity_view.data))

[-1.01262330333815, 0.0, 1.0288470541571502, -0.13296614676554902, 0.79650222865, -0.25821805726084, 0.46149546559133703, 1.07306279044003, 1.75438907383858, -0.219909538224871, 0.223515261314392, -0.0131252979106899, -0.0376919034755461, 0.930348759049177, -0.16033206009858803, -0.0452503674451511, -0.515894450159377, 0.171860617271716, 0.035042952311969, -0.026793043261075, -0.864056157762422, 0.404511129977238, 0.0, 0.0485385294222296, -1.3826303643001698, 0.336310126389912, -0.7461822853235991, 1.03689341495792, 0.5056034354709891, -0.26507148271390196]


### Prepare a record for prediction: instance and parameters lists

In [50]:
newob = {}
features = list(item.id for item in feature_values.header.feature_descriptors)
for e, f in enumerate(features):
    newob[f.capitalize()] = feature_values.entity_view.data[e].value.double_value

In [51]:
instances = [json_format.ParseDict(newob, Value())]
parameters = json_format.ParseDict({}, Value())

### Pick An Endpoint
A list index of [0] here retrieves the first endpoint in this project:

In [52]:
aiplatform.Endpoint.list()[0].display_name

'05f_fraud_20211203120602'

In [53]:
endpoint = aiplatform.Endpoint(endpoint_name = aiplatform.Endpoint.list()[0].name)

### Get Predictions: Python Client

In [54]:
prediction = endpoint.predict(instances = instances, parameters = parameters)

In [55]:
prediction

Prediction(predictions=[[0.999251425, 0.000748594233]], deployed_model_id='2291109553401495552', explanations=None)

In [56]:
prediction.predictions[0]

[0.999251425, 0.000748594233]

In [57]:
np.argmax(prediction.predictions[0])

0

### Get Predictions: REST

In [58]:
with open(f'{DIR}/request.json','w') as file:
    file.write(json.dumps({"instances": [newob]}))

In [59]:
!curl -X POST \
-H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
-H "Content-Type: application/json; charset=utf-8" \
-d @{DIR}/request.json \
https://{REGION}-aiplatform.googleapis.com/v1/{endpoint.resource_name}:predict

{
  "predictions": [
    [
      0.999251425,
      0.000748594233
    ]
  ],
  "deployedModelId": "2291109553401495552",
  "model": "projects/764015827198/locations/us-central1/models/4415994938915815424",
  "modelDisplayName": "05f_fraud_20211203120602"
}


### Get Predictions: gcloud (CLI)

In [61]:
!gcloud beta ai endpoints predict {endpoint.name.rsplit('/',1)[-1]} --region={REGION} --json-request={DIR}/request.json

Using endpoint [https://us-central1-prediction-aiplatform.googleapis.com/]
[[0.999251425, 0.000748594233]]


### Data For Prediction: Multiple Entities Served by Vertex AI > Features (Feature Store)

In [62]:
unique_id['transaction_id']

0    07fdced0-3837-47a1-9526-64d74ad9b113
1    7c1f61ba-7586-414e-ba8a-1c4385d59933
2    a3046c87-cee5-40fd-9302-4d230b823246
3    bc3f2800-a4bb-4077-b017-f55f03c4f00c
4    181ee7ba-e3b0-4a34-9ebd-50ce98c4a350
5    b8c61520-7288-444b-a9de-b270d8d490c0
6    908af6f9-d158-4d14-8335-7a6136223842
7    6830e0dd-1bcb-4d6f-a98b-32de7256922e
8    52225b6b-bd48-42ce-8e33-de5260735d3e
9    fe1e9c5f-ff54-4d0d-ba28-120b31739944
Name: transaction_id, dtype: object

In [63]:
multi_feature_values = clients['fs_olserve'].streaming_read_feature_values(
    request = aiplatform.gapic.StreamingReadFeatureValuesRequest(
        entity_type = clients['fs'].entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, ENTITYTYPE_ID),
        entity_ids = unique_id['transaction_id'],
        feature_selector = aiplatform.gapic.FeatureSelector(id_matcher=aiplatform.gapic.IdMatcher(ids=['*'])),
    )
)

In [64]:
for i in multi_feature_values:
    print(i.entity_view.entity_id)
    print(list(item.value.double_value for item in i.entity_view.data))


[]
07fdced0-3837-47a1-9526-64d74ad9b113
[-0.16033206009858803, 1.07306279044003, 0.035042952311969, -1.01262330333815, -0.026793043261075, 0.46149546559133703, -0.26507148271390196, 0.930348759049177, 0.171860617271716, -0.0452503674451511, -1.3826303643001698, -0.219909538224871, 0.0, 0.79650222865, -0.13296614676554902, 1.03689341495792, 1.0288470541571502, 0.404511129977238, 1.75438907383858, 0.336310126389912, 0.223515261314392, -0.0131252979106899, -0.25821805726084, -0.515894450159377, -0.7461822853235991, 0.5056034354709891, 0.0, 0.0485385294222296, -0.0376919034755461, -0.864056157762422]
181ee7ba-e3b0-4a34-9ebd-50ce98c4a350
[-0.39688716246813394, -0.394433988274438, 0.840231685849197, -0.22200848597894002, 0.665004897142369, -0.32351742819282503, -0.215222472550955, -1.1383662635890401, 0.5173173344698, 0.33057526374587104, -1.70199813974206, 2.2992821926639704, 0.0, -1.07329808435329, -0.680326782136273, -1.9495022115490201, 0.580970913097478, -1.25422907791396, -1.058066176

### Data For Training: Batch (For training or large scale prediction)

In [65]:
# get current timestamp (protobuf3 is seconds since ephoch (1970))
timestamp = Timestamp()
timestamp.GetCurrentTime()

# adjust timestamp to 2 days ago: 60*60*24*4
newtimestamp = Timestamp(seconds = timestamp.seconds - 60*60*24*2, nanos = timestamp.nanos)

batch_request = aiplatform.gapic.ExportFeatureValuesRequest(
    entity_type = clients['fs'].entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, ENTITYTYPE_ID),
    snapshot_export = aiplatform.gapic.ExportFeatureValuesRequest.SnapshotExport(snapshot_time = Timestamp(seconds=newtimestamp.seconds)),
    destination = aiplatform.gapic.FeatureValueDestination(bigquery_destination = aiplatform.gapic.BigQueryDestination(output_uri = f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_fs_training')),
    feature_selector = aiplatform.gapic.FeatureSelector(id_matcher = aiplatform.gapic.IdMatcher(ids = ['*']))
)

In [66]:
batchjob = clients['fs'].export_feature_values(batch_request)

In [67]:
batchjob.result()



By Adjusting the `snapshot_time` to 2 days ago, the batch_request creates a BigQuery table that has all the orginal rows, 1 per entity, but the features are null for 20% of the rows.  This is because the features were loaded with `feature_time_field = "update_time"` and `update_time` was set to a random day between today and 10 days ago.

In [69]:
query = f"""
SELECT 
    CASE WHEN {list(newob.keys())[0]} is not null then False ELSE True END as Null_Rows, 
    count(*) as counts,
    100*count(*) / (sum(count(*)) OVER()) as Percentage
FROM {PROJECT_ID}.{DATANAME}.{DATANAME}_fs_training
GROUP BY Null_Rows
"""
clients['bq'].query(query = query).to_dataframe()

Unnamed: 0,Null_Rows,counts,Percentage
0,True,56980,20.006531
1,False,227827,79.993469


---
## Remove Resources
see notebook "99 - Cleanup"