# E2E recsys with matching engine and TFRS


Simple example, goal being:

    1) Train a Two-Tower model using movielens data
    
    2) Deploy the query model endpoint
    
    3) Save movie embeddings to json, for use in matching engine
    
    
#### Note on VPC Pairing - insturctions for in-notebook pairing [here](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/matching_engine/sdk_matching_engine_for_indexing.ipynb)
    
First we will create a user-managed notebook behind the already created peered VPC network used for Matching Engine. Select tensorflow enterprise 2.6 with a T4 GPU


![](./create-workbench.png)


##### Be sure to create the notebook in the peered network


![](./network-create.png)

    
The next notebook will connect matching engine with the query endpoint for a simple recommender system

Run the below pip install one time to install tensorflow-recommenders

In [1]:
# !echo Y | pip uninstall tensorflow
!pip install tensorflow-recommenders==0.6.0 --user

Collecting tensorflow-recommenders==0.6.0
  Downloading tensorflow_recommenders-0.6.0-py3-none-any.whl (85 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.8/85.8 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-recommenders
Successfully installed tensorflow-recommenders-0.6.0


### Important - restart the kernel after installing

# Train a 2 tower model

In [1]:
from typing import Dict, Text

import json

import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs

# disable INFO and DEBUG logging everywhere
import logging

from google.cloud import aiplatform_v1beta1 #needed for matching engine calls
from google.protobuf import struct_pb2

import pandas as pd


logging.disable(logging.WARNING)

DIMENSIONS = 64 # this is how large the embedding dimensions get


# Ratings data.
ratings = tfds.load('movielens/100k-ratings', split="train")
# Features of all the available movies.
movies = tfds.load('movielens/100k-movies', split="train")

# Select the basic features.
ratings = ratings.map(lambda x: {
    "movie_id": tf.strings.to_number(x["movie_id"]),
    "user_id": tf.strings.to_number(x["user_id"])
})
movies = movies.map(lambda x: tf.strings.to_number(x["movie_id"]))

# Build a model.
class Model(tfrs.Model):

    def __init__(self):
        super().__init__()

        # Set up user representation.
        self.user_model = tf.keras.Sequential([
            tf.keras.layers.Embedding(
            input_dim=2000, output_dim=DIMENSIONS),
            ])
        # Set up movie representation.
        self.item_model = tf.keras.Sequential([
            tf.keras.layers.Embedding(
            input_dim=2000, output_dim=DIMENSIONS),
        ])
        # Set up a retrieval task and evaluation metrics over the
        # entire dataset of candidates.
        self.task = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(
                candidates=movies.batch(128).map(self.item_model)
            )
        )

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:

        user_embeddings = self.user_model(features["user_id"])
        movie_embeddings = self.item_model(features["movie_id"])

        return self.task(user_embeddings, movie_embeddings)


model = Model()
model.compile(optimizer=tf.keras.optimizers.Adagrad(0.5))

# Randomly shuffle data and split between train and test.
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

# Train.
model.fit(train.batch(1024), epochs=5)

# Evaluate.
model.evaluate(test.batch(1024), return_dict=True)

[1mDownloading and preparing dataset 4.70 MiB (download: 4.70 MiB, generated: 32.41 MiB, total: 37.10 MiB) to /home/jupyter/tensorflow_datasets/movielens/100k-ratings/0.1.0...[0m


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/1 [00:00<?, ? splits/s]

Generating train examples...:   0%|          | 0/100000 [00:00<?, ? examples/s]

Shuffling movielens-train.tfrecord...:   0%|          | 0/100000 [00:00<?, ? examples/s]

[1mDataset movielens downloaded and prepared to /home/jupyter/tensorflow_datasets/movielens/100k-ratings/0.1.0. Subsequent calls will reuse this data.[0m


2022-09-08 20:18:29.188381: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-09-08 20:18:29.284572: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-09-08 20:18:29.286464: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-09-08 20:18:29.291356: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

[1mDownloading and preparing dataset 4.70 MiB (download: 4.70 MiB, generated: 150.35 KiB, total: 4.84 MiB) to /home/jupyter/tensorflow_datasets/movielens/100k-movies/0.1.0...[0m


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/1 [00:00<?, ? splits/s]

Generating train examples...:   0%|          | 0/1682 [00:00<?, ? examples/s]

Shuffling movielens-train.tfrecord...:   0%|          | 0/1682 [00:00<?, ? examples/s]

[1mDataset movielens downloaded and prepared to /home/jupyter/tensorflow_datasets/movielens/100k-movies/0.1.0. Subsequent calls will reuse this data.[0m
Epoch 1/5


2022-09-08 20:18:34.461376: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


{'factorized_top_k/top_1_categorical_accuracy': 0.0,
 'factorized_top_k/top_5_categorical_accuracy': 0.0003000000142492354,
 'factorized_top_k/top_10_categorical_accuracy': 0.0017000000225380063,
 'factorized_top_k/top_50_categorical_accuracy': 0.05575000122189522,
 'factorized_top_k/top_100_categorical_accuracy': 0.1545500010251999,
 'loss': 3468.625,
 'regularization_loss': 0,
 'total_loss': 3468.625}

### Set your variables

In [2]:
#create a bucket one-time
# ! gsutil mb -l us-central1 gs://end-to-end-two-tower-wayfair

Creating gs://end-to-end-two-tower-wayfair/...


In [66]:
import os

PROJECT = 'wayfair-361917' #set to your own
NETWORK_NAME = 'matching-engine-vpc' #same as VPC peered network

### Create a bucket to store our embeddings and models
BUCKET = 'gs://end-to-end-two-tower-wayfair' # TODO - change for each user
EMBEDDINGS = os.path.join(BUCKET, 'embeddings')
QUERY_MODEL = os.path.join(BUCKET, 'query_model')
REGION = 'us-central1'

## Gets an auth token with the Parent variable
PROJECT_ID = PROJECT
AUTH_TOKEN = !gcloud auth print-access-token
PROJECT_NUMBER = ! gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)"
PROJECT_NUMBER = PROJECT_NUMBER[0]


PARENT = "projects/{}/locations/{}".format(PROJECT_ID, REGION)
PARENT

'projects/wayfair-361917/locations/us-central1'

In [67]:
# run one time to create your bucket
# !gsutil mb -l $REGION $BUCKET

In [68]:
# Save the query/user model

model.user_model.save(QUERY_MODEL)

In [69]:
# Make sure it saved
!gsutil ls $QUERY_MODEL

gs://end-to-end-two-tower-wayfair/query_model/
gs://end-to-end-two-tower-wayfair/query_model/keras_metadata.pb
gs://end-to-end-two-tower-wayfair/query_model/saved_model.pb
gs://end-to-end-two-tower-wayfair/query_model/assets/
gs://end-to-end-two-tower-wayfair/query_model/variables/


In [70]:
from google.cloud import aiplatform

model_gcp = aiplatform.Model.upload(
        display_name="Movielens User Query Model",
        artifact_uri=QUERY_MODEL,
        serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest',
        description="Top of the query tower, meant to return an embedding for each user instance",
    )

In [71]:
#validate the model type output
model_gcp

<google.cloud.aiplatform.models.Model object at 0x7fb918ae2550> 
resource name: projects/169420424915/locations/us-central1/models/5086793788882419712

In [72]:
import time

In [73]:
endpoint = aiplatform.Endpoint.create(
    display_name="Movielens Model Endpoint",
    project=PROJECT,
    location=REGION,
)

In [74]:
deployment = model_gcp.deploy(
    endpoint=endpoint,
    deployed_model_display_name="Movielens User Query Model",
    machine_type="n1-standard-4",
    min_replica_count=1,
    max_replica_count=2,
    accelerator_type=None,
    accelerator_count=0,
    sync=False,
)


In [75]:
deployment

<google.cloud.aiplatform.models.Endpoint object at 0x7fb912e84cd0> 
resource name: projects/169420424915/locations/us-central1/endpoints/1615516832737787904

## Save the embeddings for the movie dataset

### Write embeddings to local storage
Following this format for Matching Engine
https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/matching_engine/sdk_matching_engine_for_indexing.ipynb


In [76]:
movie_embs = movies.batch(1000).map(lambda x: [x, model.item_model(x)]).unbatch() #process 1000 at a time then flatten it back

In [77]:
# Write to local disk
with open("movie_embeddings.json", 'w') as f:
    for movie_id, movie_emb in movie_embs:
        # print(movie_id.numpy(), movie_emb.numpy())
        f.write('{"id":"' + str(movie_id.numpy()) + '","embedding":[' + ",".join(str(x) for x in list(movie_emb.numpy())) + ']}')
        f.write("\n")

You should now see .json data as required by matching engine
![](jsonl.png)

### Upload the data to GCS
Only remove if you have issues uploading the json file

In [23]:
!gsutil cp movie_embeddings.json $EMBEDDINGS/movie_embeddings.json

Copying file://movie_embeddings.json [Content-Type=application/json]...
/ [1 files][  1.2 MiB/  1.2 MiB]                                                
Operation completed over 1 objects/1.2 MiB.                                      


# Next we will deploy our movie inidicies. With Matching Engine
* Create an index (from the `json` files)
* Create and endpoint
* Deploy the index to the endpoint so you can perform vector search

In [78]:
api_endpoint_me = "{}-aiplatform.googleapis.com".format(REGION)

index_client = aiplatform_v1beta1.IndexServiceClient(
    client_options=dict(api_endpoint=api_endpoint_me)
)


DISPLAY_NAME = f"Movielens Movie: {DIMENSIONS} DIMENSIONS"

Set the Nearest Neighbor Options

See here for tips on [tuning the index](https://cloud.google.com/vertex-ai/docs/matching-engine/using-matching-engine#tuning_the_index)

Other best practices from our PM team:
```
Start from leafNodesToSearchPercent=5 and approximateNeighborsCount=10 * k

use default values for others.

measure performance and recall and change those 2 parameters accordingly.
```

In [25]:
treeAhConfig = struct_pb2.Struct(
    fields={
        "leafNodeEmbeddingCount": struct_pb2.Value(number_value=20),
        "leafNodesToSearchPercent": struct_pb2.Value(number_value=7),
    }
)

algorithmConfig = struct_pb2.Struct(
    fields={"treeAhConfig": struct_pb2.Value(struct_value=treeAhConfig)}
)

config = struct_pb2.Struct(
    fields={
        "dimensions": struct_pb2.Value(number_value=DIMENSIONS),
        "approximateNeighborsCount": struct_pb2.Value(number_value=10),
        "distanceMeasureType": struct_pb2.Value(string_value="DOT_PRODUCT_DISTANCE"),
        "algorithmConfig": struct_pb2.Value(struct_value=algorithmConfig),
    }
)

metadata = struct_pb2.Struct(
    fields={
        "config": struct_pb2.Value(struct_value=config),
        "contentsDeltaUri": struct_pb2.Value(string_value=EMBEDDINGS),
    }
)

ann_index = {
    "display_name": DISPLAY_NAME,
    "description": f"Movielens {DIMENSIONS}",
    "metadata": struct_pb2.Value(struct_value=metadata),
}

In [26]:
ann_index = index_client.create_index(parent=PARENT, index=ann_index)

In [27]:
# Poll the operation until it's done successfullly.
# This will take ~40 min.
import time 

while True:
    if ann_index.done():
        break
    print("Poll the operation to create index...")
    time.sleep(60)

Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the operation to create index...
Poll the ope

## Note on the advantages of the algorithm

[link](https://arxiv.org/pdf/1908.10396.pdf)

```However, it is easy to see that not all pairs of (x, q) are equally important. The approximation error on the pairs which have a high inner product is far more important since they are likely to be among the top ranked pairs and can greatly affect the search result, while for the pairs whose inner product is low the approximation error matters much less. In other words, for a given datapoint x, we should quantize it with a bigger focus on its error with those queries which have high inner product with x. See Figure 1 for the illustration.```


![](./algo.png)



In [34]:
ann_index

<google.api_core.operation.Operation at 0x7fb9180882d0>

In [35]:
ann_index.result()

name: "projects/169420424915/locations/us-central1/indexes/1102168047868706816"

### Save the name of the endpoint

In [36]:
INDEX_RESOURCE_NAME = ann_index.result().name
INDEX_RESOURCE_NAME

'projects/169420424915/locations/us-central1/indexes/1102168047868706816'

Debugging tool in case you run into issues. Example usage below.
`!gcloud beta ai operations describe 4122851463774863360 --index=7253099976438317056 --project=$PROJECT`

## Create Index Endpoint and Deploy Index

In [39]:
PROJECT_NUMBER



In [79]:
VPC_NETWORK_NAME = "projects/{}/global/networks/{}".format(PROJECT_NUMBER, NETWORK_NAME)
VPC_NETWORK_NAME

'projects/169420424915/global/networks/matching-engine-vpc'

In [80]:
index_endpoint = {
    "display_name": "index_endpoint_for_demo",
    "network": VPC_NETWORK_NAME,
}

In [81]:
index_endpoint_client = aiplatform_v1beta1.IndexEndpointServiceClient(
    client_options=dict(api_endpoint=api_endpoint_me)
)

ann_index_en = index_endpoint_client.create_index_endpoint(
    parent=PARENT, index_endpoint=index_endpoint
)

In [82]:
ann_index_en.result()

name: "projects/169420424915/locations/us-central1/indexEndpoints/4842407538399903744"

In [83]:
INDEX_ENDPOINT_NAME = ann_index_en.result().name
INDEX_ENDPOINT_NAME

'projects/169420424915/locations/us-central1/indexEndpoints/4842407538399903744'

In [84]:
DEPLOYED_INDEX_ID = 'movielens_deployed2'

deploy_ann_index = {
    "id": DEPLOYED_INDEX_ID,
    "display_name": DEPLOYED_INDEX_ID,
    "index": INDEX_RESOURCE_NAME,
}
r = index_endpoint_client.deploy_index(
    index_endpoint=INDEX_ENDPOINT_NAME, deployed_index=deploy_ann_index
)

In [None]:
r.result()

# Connect Matching Engine and The User Model Into a Recommendation System

This will bring it all together by incorporating the prediction endpoint 

In [93]:
# establish index_endpoint -IMPORTANT for constructing already created endpoints/indicies/etc...
ME_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(INDEX_ENDPOINT_NAME)

In [94]:
USER = 627.0 #pick anyone 0-100k to see watch history and recommendations
NUM_NEIGH=3

emb_627 = endpoint.predict([[USER]]) #prediction from the saved model
emb_627 = emb_627.predictions[0]
emb_627 # we should get our user xxx embedding @ dim len

[[0.184652567,
  -0.397861391,
  -0.129475906,
  -0.275661528,
  0.0817652121,
  0.128327087,
  -0.264759898,
  0.217002064,
  -0.374049485,
  -0.410740137,
  -1.07684159,
  0.110065386,
  0.350892216,
  -0.476837337,
  0.227980882,
  0.220079392,
  -0.272344977,
  -0.109302461,
  0.19338505,
  -0.696584523,
  0.603061438,
  -0.543672442,
  0.192285746,
  0.155405462,
  0.4221223,
  0.289614797,
  -0.58251214,
  0.107899651,
  -0.596701801,
  -0.141598403,
  0.440042049,
  0.387851566,
  -0.612558722,
  0.344112307,
  -0.0537147,
  -0.46595481,
  -0.283597469,
  0.11939574,
  -0.201237619,
  0.203903049,
  -0.375532895,
  -0.27020371,
  0.156597123,
  0.157281071,
  -0.250227481,
  -0.179565817,
  0.0832151175,
  -0.418110102,
  0.751608491,
  -0.259043574,
  -0.101647414,
  -0.0140705071,
  0.495382369,
  0.0983320475,
  -0.18000868,
  0.247827858,
  0.0271476246,
  -0.262985647,
  -0.139576316,
  -0.17529051,
  -0.48690334,
  -0.363838047,
  -0.0803812072,
  -0.0247186422]]

In [95]:
ME_index_endpoint.match(queries=emb_627, deployed_index_id=DEPLOYED_INDEX_ID, num_neighbors=10)

[[MatchNeighbor(id='1478.0', distance=4.894585609436035),
  MatchNeighbor(id='942.0', distance=4.2381672859191895),
  MatchNeighbor(id='1135.0', distance=4.2109694480896),
  MatchNeighbor(id='1004.0', distance=4.126661777496338),
  MatchNeighbor(id='1136.0', distance=3.840146064758301),
  MatchNeighbor(id='1267.0', distance=3.72882080078125),
  MatchNeighbor(id='809.0', distance=3.7181546688079834),
  MatchNeighbor(id='720.0', distance=3.669264554977417),
  MatchNeighbor(id='157.0', distance=3.653459072113037),
  MatchNeighbor(id='693.0', distance=3.645333766937256)]]

#### Create movie lookup tables
Get what given user has rated highly, and what is being recommended

In [96]:
! wget https://files.grouplens.org/datasets/movielens/ml-100k/u.item

--2022-09-09 02:12:36--  https://files.grouplens.org/datasets/movielens/ml-100k/u.item
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 236344 (231K)
Saving to: ‘u.item.1’


2022-09-09 02:12:36 (3.02 MB/s) - ‘u.item.1’ saved [236344/236344]



In [97]:
# Quick sidetour - create movie lookup dictionary
movie_names = pd.read_csv('u.item', delimiter='|' , 
                          encoding='latin-1', 
                          usecols=(0,1),
                          names = ['movie_id', 'title'])
movielookup = movie_names.to_dict()['title']

In [98]:
for i, watched_movie in enumerate(ratings.filter(lambda x: x['user_id']==USER)):
    if i >= 10: #limit to top n
        break
    else:
        key = watched_movie['movie_id'].numpy()
        print(f"""Movies watched: \n 
              {i}: {movielookup[key]}"""
             )

Movies watched: 
 
              0: Piano, The (1993)
Movies watched: 
 
              1: Star Trek: The Wrath of Khan (1982)
Movies watched: 
 
              2: Return of the Jedi (1983)
Movies watched: 
 
              3: Star Trek VI: The Undiscovered Country (1991)
Movies watched: 
 
              4: Star Trek III: The Search for Spock (1984)
Movies watched: 
 
              5: Four Rooms (1995)
Movies watched: 
 
              6: Addams Family Values (1993)
Movies watched: 
 
              7: Arsenic and Old Lace (1944)
Movies watched: 
 
              8: Pinocchio (1940)
Movies watched: 
 
              9: Dead Poets Society (1989)


In [101]:
query_vector = emb_627


ann_response = ME_index_endpoint.match(
    deployed_index_id='movielens_deployed2', 
    queries=query_vector, 
    num_neighbors=NUM_NEIGH
)

print("Recommended movie IDs:", ann_response)

Recommended movie IDs: [[MatchNeighbor(id='1478.0', distance=4.894585609436035), MatchNeighbor(id='942.0', distance=4.2381672859191895), MatchNeighbor(id='1135.0', distance=4.2109694480896)]]


In [102]:
# look at the recommended movies vs the viewed for that user
for i, match in enumerate(ann_response[0]):
    key = int(float(match.id))
    print(f"""Movies recommended: \n 
          {i}: {movielookup[key]} (distance: {match.distance})"""
         )


Movies recommended: 
 
          0: Reckless (1995) (distance: 4.894585609436035)
Movies recommended: 
 
          1: Killing Zoe (1994) (distance: 4.2381672859191895)
Movies recommended: 
 
          2: Ghosts of Mississippi (1996) (distance: 4.2109694480896)


### Cleaning up
To clean up all Google Cloud resources used in this project, you can delete the Google Cloud project you used for the tutorial. You can also manually delete resources that you created by running the following code.

In [None]:
INDEX_RESOURCE_NAME
# 7352179168240467968

In [None]:
index_endpoint_client

In [None]:
index_endpoint_client.undeploy_index(index_endpoint=INDEX_ENDPOINT_NAME, deployed_index_id=DEPLOYED_INDEX_ID)

index_client.delete_index(name=INDEX_RESOURCE_NAME)

index_endpoint_client.delete_index_endpoint(name=INDEX_ENDPOINT_NAME)

In [None]:
endpoint_resource_name = endpoint.resource_name
endpoint_resource_name

In [None]:
deployment_resource_name = deployment.resource_name
deployment_resource_name
aiplatform.Endpoint.delete(endpoint, gcp_model)
#delete our model endpoints, etc..