# Feature Store

The purpose of this notebook is to:

* Create a Feature Store
* Create an entity with some features
* Batch ingest some feature data
* Deploy a Cloud Function that can read this data and feed it to the model

## Imports

In [1]:
from google.api_core import operations_v1
from google.cloud.aiplatform_v1beta1 import FeaturestoreOnlineServingServiceClient
from google.cloud.aiplatform_v1beta1 import FeaturestoreServiceClient
from google.cloud.aiplatform_v1beta1.types import featurestore_online_service as featurestore_online_service_pb2
from google.cloud.aiplatform_v1beta1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.cloud.aiplatform_v1beta1.types import ListFeaturestoresRequest, CreateFeaturestoreRequest, Featurestore

from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud.aiplatform_v1beta1.types import featurestore_monitoring as featurestore_monitoring_pb2
from google.protobuf.duration_pb2 import Duration

import yaml

## Configuration

In [2]:
with open('mainconfig.yaml') as f:
    main_config = yaml.safe_load(f)
main_config = main_config['personal']

In [3]:
PROJECT = main_config['project'] 
REGION = main_config['region'] 

SERVICE_ACCOUNT = main_config['service_account']

print("Project ID:", PROJECT)
print("Region:", REGION)

API_ENDPOINT = f"{REGION}-aiplatform.googleapis.com"  
FEATURESTORE_ID = main_config['featurestore_id']

Project ID: pbalm-cxb-aa
Region: europe-west4


In [4]:
admin_client = FeaturestoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})
data_client = FeaturestoreOnlineServingServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})

In [5]:
print(f'Existing feature stores in project {PROJECT} and region {REGION}:')
for f in admin_client.list_featurestores(ListFeaturestoresRequest(parent=admin_client.common_location_path(PROJECT, REGION))):
      print('\t' + f.name)

Existing feature stores in project pbalm-cxb-aa and region europe-west4:
	projects/188940921537/locations/europe-west4/featurestores/creditcards
	projects/188940921537/locations/europe-west4/featurestores/creditcards2
	projects/188940921537/locations/europe-west4/featurestores/test


## Move this stuff to source code files

In [29]:
def create_fs(project, region, store_id, store_name=None):
    base_path = admin_client.common_location_path(project, region)
    
    for f in admin_client.list_featurestores(ListFeaturestoresRequest(parent=admin_client.common_location_path(project, region))):
        existing_id = f.name.split('/')[-1]
        if store_id == existing_id:
            print(f'Feature Store "{store_id}" already exists in {region}')
            return
    
    if store_name is None:
        store_name = f'{base_path}/{store_id}'
    
    req = CreateFeaturestoreRequest(
        parent = base_path,
        featurestore = Featurestore(
            name=store_name,
            online_serving_config=Featurestore.OnlineServingConfig(fixed_node_count=3)),
        featurestore_id = store_id)
    
    lro = admin_client.create_featurestore(req)
    name = lro.result()
    print(f'Created Feature Store {name} in {region}')
    return name


def create_entity(project, region, store_id, entity, entity_descr, features, features_descr=None):
    
    if features_descr is None:
        features_descr = features
    
    if len(features) != len(features_descr):
        print(f'ERROR: Got {len(features)} features and {len(features_descr)} descriptions')
        return
    
    print(f'Creating entity {entity} in Feature Store {store_id} ({region})')
    
    snapshot_analysis = featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                    monitoring_interval=Duration(seconds=3600))  # 1 hour
    
    lro = admin_client.create_entity_type(
        featurestore_service_pb2.CreateEntityTypeRequest(
            parent=admin_client.featurestore_path(project, region, store_id),
            entity_type_id=entity,
            entity_type=entity_type_pb2.EntityType(
             description=entity_descr,
             monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                snapshot_analysis=snapshot_analysis))
        )
    ).result()
    
    print(lro)
    
    def _create_f_request(name, descr):
        return featurestore_service_pb2.CreateFeatureRequest(
                feature=feature_pb2.Feature(
                    value_type=feature_pb2.Feature.ValueType.DOUBLE,
                    description=descr,
                    monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                        snapshot_analysis=snapshot_analysis)),
                feature_id=name)
    
    requests = [_create_f_request(x[0], x[1]) for x in zip(features, features_descr)]
    
    print(f'\nCreating features: {",".join(features)}')

    lro = admin_client.batch_create_features(
        parent=admin_client.entity_type_path(PROJECT, REGION, FEATURESTORE_ID, entity),
        requests=requests).result()
    
    return lro


def ingest_entities_csv(project, region, store_id, entity, features, gcs_uris):

    timestamp = Timestamp()
    timestamp.GetCurrentTime()
    timestamp.nanos = 0
    
    specs = [featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id=f) for f in features]
    
    import_request_transaction = featurestore_service_pb2.ImportFeatureValuesRequest(
        entity_type=admin_client.entity_type_path(project, region, store_id, entity),
        csv_source=io_pb2.CsvSource(gcs_source=io_pb2.GcsSource(uris=gcs_uris)),
        feature_specs=specs,
        entity_id_field=entity,
        feature_time=timestamp, # unique timestamp for all
        worker_count=5)
    
    print(f'Ingesting features for "{entity}" entity...')
    ingestion_lro = admin_client.import_feature_values(import_request_transaction).result()
    print('done')
    
    return ingestion_lro


## Create Feature Store and entity with features

In [28]:
create_fs(PROJECT, REGION, FEATURESTORE_ID, "Feature Store for credit card use case")

Feature Store "creditcards" already exists in europe-west4


In [17]:
admin_client.get_featurestore(name = admin_client.featurestore_path(PROJECT, REGION, FEATURESTORE_ID))

name: "projects/188940921537/locations/europe-west4/featurestores/creditcards"
create_time {
  seconds: 1657035029
  nanos: 610160000
}
update_time {
  seconds: 1657035029
  nanos: 696023000
}
etag: "AMEw9yNySvrvNngonhD6dilmNq2AXx-RvqOGGff-RkRKjWx8tLu6Xn9ckikiNWQSPlWv"
online_serving_config {
}
state: STABLE

In [9]:
entity = 'user'
entity_descr = 'User ID'
features = ['v27', 'v28']

In [10]:
create_entity(PROJECT, REGION, FEATURESTORE_ID, entity, entity_descr, features)

Creating entity user in Feature Store creditcards (europe-west4)
name: "projects/188940921537/locations/europe-west4/featurestores/creditcards/entityTypes/user"


Creating features: v27,v28


features {
  name: "projects/188940921537/locations/europe-west4/featurestores/creditcards/entityTypes/user/features/v27"
}
features {
  name: "projects/188940921537/locations/europe-west4/featurestores/creditcards/entityTypes/user/features/v28"
}

### Create the feature data

In [11]:
import random

filename = f'features_{entity}.csv'

with open(filename, 'w') as f:
    line = f'{entity},{",".join(features)}\n'
    f.write(line)
    for i in range(100):
        f.write(f'user{i},{random.random()},{random.random()}\n')

In [12]:
!cat {filename}

user,v27,v28
user0,0.2537841741063718,0.4725219862181388
user1,0.4810778527830739,0.0786319167096956
user2,0.004904066853428257,0.3434039028889927
user3,0.7045713577509266,0.35689834052375913
user4,0.41586271469651037,0.9700960978855925
user5,0.8007234844415386,0.03692566040140699
user6,0.11798145197018117,0.368267817548818
user7,0.4952975897425108,0.42654906642857726
user8,0.6653288204799798,0.313446231144191
user9,0.28504358696270937,0.7493597310211866
user10,0.7759283260982069,0.3909505895184139
user11,0.1238925934372278,0.19601401428670318
user12,0.6712875432868881,0.4779527492476682
user13,0.8870078995851961,0.3732385206666021
user14,0.040853895614664126,0.04240994262733466
user15,0.8742151246808288,0.3911840027947241
user16,0.15559454391882577,0.6753152510561594
user17,0.3356751580588465,0.4761193973489124
user18,0.3573400856356008,0.03229923286861136
user19,0.5560426397340824,0.5024599144871117
user20,0.0018353537561167643,0.887748403003119
user21,0.15287021036343185,0.766566831

In [13]:
BUCKET = main_config['bucket']
BUCKET

'pbalm-cxb-aa-eu'

In [14]:
!gsutil cp {filename} gs://{BUCKET}/{filename} 

Copying file://features_user.csv [Content-Type=text/csv]...
/ [1 files][  4.4 KiB/  4.4 KiB]                                                
Operation completed over 1 objects/4.4 KiB.                                      


## Ingest feature data

In [15]:
gcs_uris = [f'gs://{BUCKET}/{filename}']

ingest_entities_csv(PROJECT, REGION, FEATURESTORE_ID, entity, features, gcs_uris)

Batch ingestion for "user" entity...
done


imported_entity_count: 100
imported_feature_value_count: 200