# Featurestore creation and population

Goal of this notebook is:
* Set up featurestore with the IMDB data
* Ingest (batch) data elements for FS prediction
* Create a movie and user feature object


Additional guide can be found [here](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/feature_store/gapic-feature-store.ipynb)

## Set up featurestore

In [17]:
#! pip install --user -q --upgrade git+https://github.com/googleapis/python-aiplatform.git@main-test

In [3]:
# set up the featurestore client
import pandas as pd
from google.cloud.aiplatform_v1beta1 import (
    FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient)
from google.cloud.aiplatform_v1beta1 import (
    FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient)
from google.cloud.aiplatform_v1beta1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1beta1.types import \
    entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_monitoring as featurestore_monitoring_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_online_service as featurestore_online_service_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.protobuf.duration_pb2 import Duration
from google.cloud import bigquery
from datetime import datetime


#variables change to your liking
BUCKET = "matching-engine-demo-blog"
BQ_DATASET = 'movielens'
PROJECT_ID = 'matching-engine-blog'
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"  # @param {type:"string"}
FEATURESTORE_ID = "lowes_test"
REGION = 'us-central1'



admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": API_ENDPOINT})

data_client = FeaturestoreOnlineServingServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)

BASE_RESOURCE_PATH = admin_client.common_location_path(PROJECT_ID, REGION)

### Terminology and Concept
#### Featurestore Data model
Feature Store organizes data with the following 3 important hierarchical concepts:

`Featurestore -> EntityType -> Feature`

* Featurestore: the place to store your features
* EntityType: under a Featurestore, an EntityType describes an object to be modeled, real one or virtual one.
* Feature: under an EntityType, a feature describes an attribute of the EntityType

In the movie prediction example, you will create a featurestore called movie_prediction. This store has 2 entity types: Users and Movies. The Users entity type has the age, gender, and like genres features. The Movies entity type has the genres and average rating features.

### Create Featurestore and Define Schemas
#### Create Featurestore
The method to create a featurestore returns a long-running operation (LRO). An LRO starts an asynchronous job. LROs are returned for other API methods too, such as updating or deleting a featurestore. Calling create_fs_lro.result() waits for the LRO to complete.

In [4]:
create_lro = admin_client.create_featurestore(
    featurestore_service_pb2.CreateFeaturestoreRequest(
        parent=BASE_RESOURCE_PATH,
        featurestore_id=FEATURESTORE_ID,
        featurestore=featurestore_pb2.Featurestore(
            online_serving_config=featurestore_pb2.Featurestore.OnlineServingConfig(
                fixed_node_count=1
            ),
        ),
    )
)

In [5]:
print(create_lro.result())

name: "projects/180938242395/locations/us-central1/featurestores/lowes_test"



You can use GetFeaturestore or ListFeaturestores to check if the Featurestore was successfully created. The following example gets the details of the Featurestore.

In [6]:
admin_client.get_featurestore(
    name=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID)
)

name: "projects/180938242395/locations/us-central1/featurestores/lowes_test"
create_time {
  seconds: 1634228649
  nanos: 462337000
}
update_time {
  seconds: 1634228649
  nanos: 541546000
}
etag: "AMEw9yNK6gjSk9xsJmP8W8u7PAAok3at2FYCB3pPDH2EPXsiWXSjnu5rjz0Uo55xxhA4"
online_serving_config {
  fixed_node_count: 1
}
state: STABLE

## Create feature objects and entities

In [7]:
# Create users entity type with monitoring enabled.
# All Features belonging to this EntityType will by default inherit the monitoring config.
users_entity_type_lro = admin_client.create_entity_type(
    featurestore_service_pb2.CreateEntityTypeRequest(
        parent=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
        entity_type_id="users",
        entity_type=entity_type_pb2.EntityType(
            description="Users entity",
            monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                    monitoring_interval=Duration(seconds=86400),  # 1 day
                ),
            ),
        ),
    )
)

# Similarly, wait for EntityType creation operation.
print(users_entity_type_lro.result())

name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/users"
etag: "AMEw9yMKjrKt7uAD58y2SX4siPuHOfJhZex1rISl3drQ2s7mpAmA"



In [8]:
# Create movies entity type without a monitoring configuration.
movies_entity_type_lro = admin_client.create_entity_type(
    featurestore_service_pb2.CreateEntityTypeRequest(
        parent=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
        entity_type_id="movies",
        entity_type=entity_type_pb2.EntityType(description="Movies entity"),
    )
)

# Similarly, wait for EntityType creation operation.
print(movies_entity_type_lro.result())

name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/movies"
etag: "AMEw9yPafQv2G961FssbzybQkQ2NO6a2PAE4Ic8dzEJ-0ZfKgjzD"



### Create features for user and movies

In [9]:
# Create features for the 'users' entity.
# For Features with monitoring enabled, distribution statistics are updated periodically in the console.
admin_client.batch_create_features(
    parent=admin_client.entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, "users"),
    requests=[
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Movie ID",
            ),
            feature_id="movie_id",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="Movie 5 star rating",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        monitoring_interval=Duration(seconds=172800),  # 2 days
                    ),
                ),
            ),
            feature_id="rating",
        ),
          featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Review week number (52 weeks)",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=True,
                    ),
                ),
            ),
            feature_id="week_num",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Review hour",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        monitoring_interval=Duration(seconds=172800),  # 2 days
                    ),
                ),
            ),
            feature_id="hour",
        ),
    ],
).result()

features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/users/features/movie_id"
  etag: "AMEw9yM_-bSjWNonuGjVN0RR281GR4ieN5lWVsiAPKHWuVO9eHz6"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/users/features/rating"
  etag: "AMEw9yPmXcMVHjuJZLaTuE_OiQbhz5Bm89yMBc-phTnUAReD3XDC"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/users/features/week_num"
  etag: "AMEw9yNwLa8RdcR5EONxZfBppCuxRtTkmmmxzspKJePAJ7zNVh0X"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/users/features/hour"
  etag: "AMEw9yMzLQx2EW-9IF1J8FS-CA7GWpXY5uRl7YjZIRy-j-4jrthd"
}

#### Now, create movie features

In [10]:
# Create features for movies type.
# 'title' Feature enables monitoring.
admin_client.batch_create_features(
    parent=admin_client.entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, "movies"),
    requests=[
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="The title of the movie", 
            ),
            feature_id="title",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Adult movie flag",
            ),
            feature_id="is_adult",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="The average rating for the movie, range is [1.0-10.0]",
            ),
            feature_id="average_rating",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Review Count",
            ),
            feature_id="num_votes",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Movie director(s) id - this is a key that matches to IMDB",
            ),
            feature_id="director",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Director(s) profession - a concat space seperated string of ids",
            ),
            feature_id="d_profession",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Actors - id these are keys that match to IMDB",
            ),
            feature_id="actors",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Actors professions - a concat space seperated string of ids",
            ),
            feature_id="actor_profession",
        ),
    ],
).result()

features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/movies/features/title"
  etag: "AMEw9yMQs_tGsdI5m9tdFwKUWY8YJtbyWYw7KT9Q-j8_KIHzC-2E"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/movies/features/is_adult"
  etag: "AMEw9yPPS3psYsXmRXpaWrQE46J9y5mvd9ZGTQ8Z6xVVNyUvcaVv"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/movies/features/average_rating"
  etag: "AMEw9yPl8YI_gDhG38pHwRygXKMKaQlCH7H9Ud4SliHHulbOZpP-"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/movies/features/num_votes"
  etag: "AMEw9yMZfzYTlzcjW_yEjTmPY2G_1ybBsOstXqrHCanm71RL-h-d"
}
features {
  name: "projects/180938242395/locations/us-central1/featurestores/lowes_test/entityTypes/movies/features/director"
  etag: "AMEw9yO3CFF82L9rEb84RtFdORXeiGyFQRU9yldPVKISeJ2J1Dvz"
}
features {
  name: "projects/1809382

### Now list and search features

In [11]:
# Search for all features across all featurestores.
list(admin_client.search_features(location=BASE_RESOURCE_PATH))

[name: "projects/180938242395/locations/us-central1/featurestores/imdb_features2/entityTypes/movies/features/actors"
 description: "Actors - id these are keys that match to IMDB"
 create_time {
   seconds: 1633544808
   nanos: 868311000
 }
 update_time {
   seconds: 1633544808
   nanos: 868311000
 },
 name: "projects/180938242395/locations/us-central1/featurestores/imdb_features2/entityTypes/movies/features/actor_profession"
 description: "Actors professions - a concat space seperated string of ids"
 create_time {
   seconds: 1633544808
   nanos: 869450000
 }
 update_time {
   seconds: 1633544808
   nanos: 869450000
 },
 name: "projects/180938242395/locations/us-central1/featurestores/imdb_features2/entityTypes/movies/features/average_rating"
 description: "The average rating for the movie, range is [1.0-10.0]"
 create_time {
   seconds: 1633544808
   nanos: 862752000
 }
 update_time {
   seconds: 1633544808
   nanos: 862752000
 },
 name: "projects/180938242395/locations/us-central1/fe

## Batch load data from BQ to FS

In [17]:
from dateutil.parser import parse
import datetime as datetime_class

def format_time(): #need time precision to not include microsecond info - this is a featurestore requirement
    t = datetime.now()
    if t.microsecond % 1000 >= 500:  # check if there will be rounding up
        t = t + datetime_class.timedelta(milliseconds=1)  # manually round up
    string = t.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    return parse(string)

import_users_request = featurestore_service_pb2.ImportFeatureValuesRequest(
    entity_type=admin_client.entity_type_path(
        PROJECT_ID, REGION, FEATURESTORE_ID, "users"
    ),
    #disableOnlineServing = False, #set to true to backfill features
    
    # Source
    bigquery_source=io_pb2.BigQuerySource(
        input_uri=f"bq://{PROJECT_ID}.{BQ_DATASET}.user_view"
    ),
    entity_id_field="user_id",
    feature_specs=[
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="movie_id"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="rating"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="week_num"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="hour"),
    ],
    feature_time=format_time(),
    worker_count=10,
    disable_online_serving = True # set to backfill historic features
)

In [11]:
# Start to import, will take a couple of minutes
ingestion_lro = admin_client.import_feature_values(import_users_request)

In [None]:
import time
# Polls for the LRO status and prints when the LRO has completed
start_time = datetime.now()
mins = 0 
while True: #polling to keep the session alive every 10 minutes
    if ingestion_lro.done():
        break
    print(f"Running for {mins} minutes")
    mins+=10
    time.sleep(60 * 10) # ten minutes
runtime_mins = (datetime.now() - start_time)
print(f"Ran for a total of {runtime_mins}") 

Running for 0 minutes
Running for 10 minutes


In [None]:
ingestion_lro.result()

### Load the movie features

In [10]:
import_movies_request = featurestore_service_pb2.ImportFeatureValuesRequest(
    entity_type=admin_client.entity_type_path(
        PROJECT_ID, REGION, FEATURESTORE_ID, "movies"
    ),
    # Source
    bigquery_source=io_pb2.BigQuerySource(
        input_uri=f"bq://{PROJECT_ID}.{BQ_DATASET}.movie_view"
    ),
    entity_id_field="movie_id",
    feature_specs=[
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="title"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="is_adult"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="average_rating"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="num_votes"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="director"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="d_profession"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="actors"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="actor_profession"),
    ],
    feature_time=format_time(),
    worker_count=10,
    disable_online_serving = False # set to backfill historic features
)

In [11]:
# Start to import, will take a couple of minutes
ingestion_lro = admin_client.import_feature_values(import_movies_request)
#time.sleep(60) #sleeping to allow for provisioning of the request

In [12]:
import time
# Polls for the LRO status and prints when the LRO has completed
start_time = datetime.now()
mins = 0
while True: #polling to keep the session alive every 10 minutes
    if ingestion_lro.done():
        break
    print(f"Running for {mins} minutes")
    mins+=10
    time.sleep(60 * 10) # ten minutes
runtime_mins = (datetime.now() - start_time)
print(f"Ran for a total of {runtime_mins}") 

Running for 0 minutes
Ran for a total of 0:10:00.229444


In [13]:
ingestion_lro.result()

imported_entity_count: 31304
imported_feature_value_count: 250287

## Online serving
The Online Serving APIs lets you serve feature values for small batches of entities. It's designed for latency-sensitive service, such as online model prediction. For example, for a movie service, you might want to quickly shows movies that the current user would most likely watch by using online predictions.

### Read one entity per request
The ReadFeatureValues API is used to read feature values of one entity; hence its custom HTTP verb is readFeatureValues. By default, the API will return the latest value of each feature, meaning the feature values with the most recent timestamp.

To read feature values, specify the entity ID and features to read. The response contains a header and an entity_view. Each row of data in the entity_view contains one feature value, in the same order of features as listed in the response header.

In [14]:
# Fetch the following 3 features.
feature_selector = FeatureSelector(
    id_matcher=IdMatcher(ids=["hour", "rating", "movie_id"])
)

response_stream = data_client.streaming_read_feature_values(
    featurestore_online_service_pb2.StreamingReadFeatureValuesRequest(
        # Fetch from the following feature store/entity type
        entity_type=admin_client.entity_type_path(
            PROJECT_ID, REGION, FEATURESTORE_ID, "users"
        ),
        entity_ids=["6245", "5678"],
        feature_selector=feature_selector,
    )
)

In [15]:
# Iterate and process response. Note the first one is always the header only.
for response in response_stream:
    print(response)

header {
  entity_type: "projects/180938242395/locations/us-central1/featurestores/imdb/entityTypes/users"
  feature_descriptors {
    id: "hour"
  }
  feature_descriptors {
    id: "rating"
  }
  feature_descriptors {
    id: "movie_id"
  }
}

entity_view {
  entity_id: "5678"
  data {
    value {
      int64_value: 17
      metadata {
        generate_time {
          seconds: 1633547541
          nanos: 450000000
        }
      }
    }
  }
  data {
    value {
      double_value: 3.0
      metadata {
        generate_time {
          seconds: 1633547541
          nanos: 450000000
        }
      }
    }
  }
  data {
    value {
      int64_value: 5989
      metadata {
        generate_time {
          seconds: 1633547541
          nanos: 450000000
        }
      }
    }
  }
}

entity_view {
  entity_id: "6245"
  data {
    value {
      int64_value: 10
      metadata {
        generate_time {
          seconds: 1633547541
          nanos: 450000000
        }
      }
    }
  }
  da

In [14]:
%%bigquery top_movie_ids
SELECT movie_id FROM `matching-engine-blog.movielens.movie_view` ORDER BY num_votes DESC LIMIT 5

Query complete after 0.03s: 100%|██████████| 1/1 [00:00<00:00, 444.74query/s]                          
Downloading: 100%|██████████| 5/5 [00:02<00:00,  1.95rows/s]


In [15]:
top_movie_ids = list(top_movie_ids['movie_id'])

In [18]:
# Fetch the following 3 features.
feature_selector = FeatureSelector(
    id_matcher=IdMatcher(ids=[ "title", "director", "actors"])
)

response_stream = data_client.streaming_read_feature_values(
    featurestore_online_service_pb2.StreamingReadFeatureValuesRequest(
        # Fetch from the following feature store/entity type
        entity_type=admin_client.entity_type_path(
            PROJECT_ID, REGION, FEATURESTORE_ID, "movies"
        ),
        entity_ids=top_movie_ids,
        feature_selector=feature_selector,
    )
)

In [19]:
# Iterate and process response. Note the first one is always the header only.
for response in response_stream:
    print(response)

header {
  entity_type: "projects/180938242395/locations/us-central1/featurestores/imdb_features2/entityTypes/movies"
  feature_descriptors {
    id: "title"
  }
  feature_descriptors {
    id: "director"
  }
  feature_descriptors {
    id: "actors"
  }
}

entity_view {
  entity_id: "109487"
  data {
    value {
      string_value: "Interstellar"
      metadata {
        generate_time {
          seconds: 1633544831
          nanos: 226000000
        }
      }
    }
  }
  data {
    value {
      string_value: "nm0634240"
      metadata {
        generate_time {
          seconds: 1633544831
          nanos: 226000000
        }
      }
    }
  }
  data {
    value {
      string_value: "nm0000190"
      metadata {
        generate_time {
          seconds: 1633544831
          nanos: 226000000
        }
      }
    }
  }
}

entity_view {
  entity_id: "2959"
  data {
    value {
      string_value: "Fight Club"
      metadata {
        generate_time {
          seconds: 1633544831
     

## Cleaning up

In [2]:
admin_client.delete_featurestore(
    request=featurestore_service_pb2.DeleteFeaturestoreRequest(
        name=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
        force=True,
    )
).result()

print("Deleted featurestore '{}'.".format(FEATURESTORE_ID ))

NameError: name 'admin_client' is not defined