## Before you begin

### Install additional packages

For this notebook, you need the Vertex SDK for Python.

In [2]:
# ! pip install {USER_FLAG} --upgrade google-cloud-aiplatform

### Import libraries and define constants

In [1]:
from google.cloud import aiplatform
from google.cloud.aiplatform import Feature, Featurestore

REGION = "us-central1"  # @param {type:"string"}
if REGION == "[your-region]":
    REGION = "us-central1"

FEATURESTORE_ID = "embedding_poc"
# INPUT_CSV_FILE = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"
ONLINE_STORE_FIXED_NODE_COUNT = 1
PROJECT_ID = "wortz-project-352116"

#embedding parameters
N_PRODUCTS = 1_000_000
N_CUSTOMERS = 1_000_000
PROD_EMB_DIM = 512
CUST_EMB_DIM = 512

aiplatform.init(project=PROJECT_ID, location=REGION)

## Terminology and Concept

### Featurestore Data model

Vertex AI Feature Store organizes data with the following 3 important hierarchical concepts:
```
Featurestore -> Entity type -> Feature
```
* **Featurestore**: the place to store your features
* **Entity type**: under a Featurestore, an Entity type describes an object to be modeled, real one or virtual one.
* **Feature**: under an Entity type, a Feature describes an attribute of the Entity type

In the movie prediction example, you will create a featurestore called `movie_prediction`. This store has 2 entity types: `users` and `movies`. The `users` entity type has the `age`, `gender`, and `liked_genres` features. The `movies` entity type has the `titles`, `genres`, and `average rating` features.


## Create Featurestore and Define Schemas

### Create Featurestore

The method to create a Featurestore returns a
[long-running operation](https://google.aip.dev/151) (LRO). An LRO starts an asynchronous job. LROs are returned for other API
methods too, such as updating or deleting a featurestore. Running the code cell will create a featurestore and print the process log.

In [2]:
# fs = Featurestore.create(
#     featurestore_id=FEATURESTORE_ID,
#     online_store_fixed_node_count=ONLINE_STORE_FIXED_NODE_COUNT,
#     project=PROJECT_ID,
#     location=REGION,
#     sync=True,
# )

Use the function call below to retrieve a Featurestore and check that it has been created.


In [3]:
fs = Featurestore(
    featurestore_name=FEATURESTORE_ID,
    project=PROJECT_ID,
    location=REGION,
)
print(fs.gca_resource)

name: "projects/679926387543/locations/us-central1/featurestores/embedding_poc"
create_time {
  seconds: 1654705327
  nanos: 805462000
}
update_time {
  seconds: 1654705327
  nanos: 864581000
}
etag: "AMEw9yOPA-QPctV6YwyGmuk-tqA5ZlG9Iblrgs2nUUM0oE60ji0JMZI2SgaGQHNpa9O7"
online_serving_config {
  fixed_node_count: 1
}
state: STABLE



### Create Entity Type

Entity types can be created within the Featurestore class. Below, create the Users entity type and Movies entity type. A process log will be printed out.

In [12]:
# # Create users entity type
# users_entity_type = fs.create_entity_type(
#     entity_type_id="user",
#     description="Users",
# )

Creating EntityType
Create EntityType backing LRO: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/operations/6782040126760419328
EntityType created. Resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
To use this EntityType in another session:
entity_type = aiplatform.EntityType('projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user')


In [17]:
# # Create movies entity type
# product_entity_type = fs.create_entity_type(
#     entity_type_id="products",
#     description="Products",
# )

To retrieve an entity type or check that it has been created use the [get_entity_type](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/featurestore/featurestore.py#L106) or [list_entity_types](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/featurestore/featurestore.py#L278) methods on the Featurestore object.


In [4]:
users_entity_type = fs.get_entity_type(entity_type_id="user")
products_entity_type = fs.get_entity_type(entity_type_id="products")
print(users_entity_type)
print(products_entity_type)

<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fbbe2e58910> 
resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fbbd94c6890> 
resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/products


In [7]:
fs.list_entity_types()

[<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fcf30835250> 
 resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/products,
 <google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fcf30835890> 
 resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user]

### Create Feature
Features can be created within each entity type. Add defining features to the Users entity type and Movies entity type by using the `create_feature` method.

In [22]:
# # to create features one at a time use
# user_embs = users_entity_type.create_feature(
#     feature_id="user_emb",
#     value_type="DOUBLE_ARRAY",
#     description="User age", #todo - fix to user embedding
# )

# users_feature_gender = products_entity_type.create_feature(
#     feature_id="product_emb",
#     value_type="DOUBLE_ARRAY",
#     description="Product Embedding",
# )

Creating Feature
Create Feature backing LRO: projects/679926387543/locations/us-central1/featurestores/embedding_poc/operations/3625016787973701632
Feature created. Resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/features/user_emb
To use this Feature in another session:
feature = aiplatform.Feature('projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/features/user_emb')
Creating Feature
Create Feature backing LRO: projects/679926387543/locations/us-central1/featurestores/embedding_poc/operations/3373659633771085824
Feature created. Resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/products/features/product_emb
To use this Feature in another session:
feature = aiplatform.Feature('projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/products/features/product_emb')


Use the [list_features](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/featurestore/entity_type.py#L349) method to list all the features of a given entity type.

In [11]:
users_entity_type.list_features()

[<google.cloud.aiplatform.featurestore.feature.Feature object at 0x7f7d6095c1d0> 
 resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/features/user_emb]

## Import Feature Values

You need to import feature values before you can use them for online/offline serving. In this step, you learn how to import feature values by ingesting the values from GCS (Google Cloud Storage). You can also import feature values from BigQuery or a Pandas dataframe.


### Import feature values for Users entity type

When importing, specify the following in your request:

* Random generated embedding data


In [5]:
import numpy as np

# Override here for quick dev
N_CUSTOMERS = 1000
N_PRODUCTS = 1000
user_emb_local = np.random.rand(N_CUSTOMERS, CUST_EMB_DIM)

prod_emb_local = np.random.rand(N_PRODUCTS, PROD_EMB_DIM)

print(f"Shape for user embeddings: {user_emb_local.shape} \nShape for product_embeddings {prod_emb_local.shape}")

#get MB of data - 8 bytes per double
user_est_mb = 8 * user_emb_local.shape[0] * user_emb_local.shape[1] / 1_000_000
prod_est_mb = 8 * prod_emb_local.shape[0] * prod_emb_local.shape[1] / 1_000_000

print(f"MBs user emb: {user_est_mb} \nMB prod emb: {prod_est_mb}")

Shape for user embeddings: (1000, 512) 
Shape for product_embeddings (1000, 512)
MBs user emb: 4.096 
MB prod emb: 4.096


In [6]:
# put the data into dataframes
import pandas as pd
import numpy as np
user_emb_data = pd.DataFrame(np.arange(N_CUSTOMERS), columns=['entity_id'])
user_emb_data['entity_id'] = user_emb_data['entity_id'].map(str)
user_emb_data['user_emb'] = user_emb_local.tolist() #be sure to match the names
# user_emb_data = user_emb_data.drop(0, axis=1)
# user_emb_data['entity_id'] = user_entity_resource_name

prod_emb_data = pd.DataFrame(np.arange(N_PRODUCTS), columns=['entity_id'])
prod_emb_data['entity_id'] = prod_emb_data['entity_id'].map(str)
prod_emb_data['product_emb'] = prod_emb_local.tolist() #be sure to match the names
# prod_emb_data = prod_emb_data.drop(0, axis=1)
# prod_emb_data['entity_id'] = product_entity_resource_name


In [9]:
user_emb_data

Unnamed: 0,entity_id,user_emb
0,0,"[0.9464177024086784, 0.23233340090575905, 0.87..."
1,1,"[0.992864020523561, 0.9282853464879975, 0.2389..."
2,2,"[0.714402887932741, 0.6407287422737147, 0.5264..."
3,3,"[0.19050736817389546, 0.08266669193992826, 0.0..."
4,4,"[0.4349632348191145, 0.8840255034951592, 0.806..."
...,...,...
995,995,"[0.7352608963791913, 0.31320071153339135, 0.59..."
996,996,"[0.8652637780875029, 0.9216894016659564, 0.092..."
997,997,"[0.0998902754130544, 0.33255599450721385, 0.72..."
998,998,"[0.4966973579681594, 0.06658641305152746, 0.56..."


In [None]:
# We will use this to ingest data - expand for documentation
?users_entity_type.ingest_from_df

In [8]:
%%timeit
# Also set the feature load time
from datetime import datetime
USERS_FEATURE_TIME = datetime.now()

users_entity_type.ingest_from_df(
    feature_ids=['user_emb'], 
    feature_time=USERS_FEATURE_TIME, #this defaults to current time, we can change later to test time travel
    df_source=user_emb_data,
)

Importing EntityType feature values: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
Import EntityType feature values backing LRO: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/operations/3625984358206144512
EntityType feature values imported. Resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
Importing EntityType feature values: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
Import EntityType feature values backing LRO: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/operations/1986674093843283968
EntityType feature values imported. Resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
Importing EntityType feature values: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
Import Ent

## Get embeddings, do some profiling/timing analysis

### Read one entity per request

With the Python SDK, it is easy to read feature values of one entity. By default, the SDK will return the  latest value of each feature, meaning the feature values with the most recent  timestamp.

To read feature values, specify the entity type ID and features to read. By default all the features of an entity type will be selected. The response will output and display the selected entity type ID and the selected feature values as a Pandas dataframe.

In [13]:
%%timeit
users_entity_type.read(entity_ids="555_555")

141 ms ± 17.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Read multiple entities per request

To read feature values from multiple entities, specify the different entity type IDs. By default all the features of an entity type will be selected. Note that fetching only a small number of entities is recommended when using this SDK due to its latency-sensitive nature.

In [14]:
%%timeit
users_entity_type.read(
    entity_ids=["7", "66", "999999"], feature_ids=["user_emb"]
)

The slowest run took 4.11 times longer than the fastest. This could mean that an intermediate result is being cached.
33.3 ms ± 14.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


### Final section - create synthetic data, time travel and establish a matching engine index

In [29]:
def create_csv(n_rows, emb_dim, filename):
    with open(filename, 'w') as file:
        file.write("entity_id,user_emb\n")
        for row in range(n_rows):
            file.write(f"{row},{np.random.rand(emb_dim).tolist()}\n")

create_csv(N_CUSTOMERS, CUST_EMB_DIM, "cust_embs.csv")

In [None]:
users_entity_type.enti

In [39]:
# !pip install avro --user

Collecting avro
  Downloading avro-1.11.0.tar.gz (83 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m83.4/83.4 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: avro
  Building wheel for avro (pyproject.toml) ... [?25ldone
[?25h  Created wheel for avro: filename=avro-1.11.0-py2.py3-none-any.whl size=115908 sha256=b860b2ce231ce390f81e16c9ee27e1e9790ebfb264c6d6e813e0c3417efe610a
  Stored in directory: /home/jupyter/.cache/pip/wheels/7d/79/ec/d7acfd56e9934b311783689c07ffecf6af9bde172950927f6d
Successfully built avro
Installing collected packages: avro
[0mSuccessfully installed avro-1.11.0


# Generate random avro file for upload - we will use this later in matching engine retreival

In [None]:
# Convert to avro
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import json


user_avro_schema = avro.schema.parse(json.dumps({
  "type": "record",
  "name": "user",
  "fields": [
      {
       "name":"entity_id",
       "type":["null","string"]
      },
      {
       "name":"user_emb",
          "type":
          {
            "type": "array",
            "items": "double",
            "default": []
          }
      },
  ]
 }))

def create_avro(n_rows, emb_dim, filename):
    with open(filename, 'w') as out:
        writer = DataFileWriter(out, DatumWriter(), user_avro_schema)
        for record in range(n_rows):
            record = {"user_emb": np.random.rand(emb_dim).tolist(), 
                      "entity_id": str(record)}
            writer.append(record)
    out.close()
            
create_avro(N_CUSTOMERS, CUST_EMB_DIM, 'cust_embs.avro')
                

In [8]:
!gsutil cp cust_embs.avro gs://wortz-project-bucket/fs-embeddings/6-9-22

Copying file://cust_embs.avro [Content-Type=application/octet-stream]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

| [1 files][  3.8 GiB/  3.8 GiB]  137.2 MiB/s                                   
Operation completed over 1 objects/3.8 GiB.                                      


### Finally ingest the avro features - takes about 8 minutes with one worker

In [9]:
from datetime import datetime
USERS_FEATURE_TIME = datetime.now()

users_entity_type.ingest_from_gcs(
    feature_ids=['user_emb'],
    feature_time=USERS_FEATURE_TIME,
    gcs_source_uris='gs://wortz-project-bucket/fs-embeddings/6-9-22/cust_embs.avro',
    gcs_source_type='avro',
    worker_count=1,
    sync=True,
)

Importing EntityType feature values: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user
Import EntityType feature values backing LRO: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user/operations/2480381202993774592
EntityType feature values imported. Resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fcf22ef5c50> 
resource name: projects/679926387543/locations/us-central1/featurestores/embedding_poc/entityTypes/user

Now that you have learned how to fetch imported feature values for online serving, the next step is learning how to use imported feature values for offline use cases.

In [None]:
# ?users_entity_type.ingest_from_gcs

In [None]:
# # Delete Featurestore
# fs.delete(force=True)

In [None]:
# # Delete BigQuery dataset
# client = bigquery.Client(project=PROJECT_ID)
# client.delete_dataset(
#     DESTINATION_DATA_SET, delete_contents=True, not_found_ok=True
# )  # Make an API request.

# print("Deleted dataset '{}'.".format(DESTINATION_DATA_SET))