In [1]:
import asyncio
import base64
import json
import os
import pickle
from datetime import date
from functools import lru_cache
import aiohttp
import pandas as pd
from dotenv import load_dotenv
from flask import Flask, abort
from flask_cors import CORS, cross_origin
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool

In [None]:
load_dotenv()  # take environment variables from .env.

auth_headers = {
    "Authorization": f'Tecton-key {os.environ["TECTON_API_KEY"]}',
}
prediction_headers = {"Content-Type": "application/json"}

get_features_endpoint = "/api/v1/feature-service/get-features"
tecton_conn_pool = HTTPSConnectionPool(os.environ["TECTON_URL"])
prediction_conn_pool = HTTPConnectionPool("localhost:5002")
loop = asyncio.new_event_loop()
connector = aiohttp.TCPConnector(limit=100, loop=loop)
client = aiohttp.ClientSession(loop=loop, headers=auth_headers, connector=connector)
url = f"https://{os.environ['TECTON_URL']}/api/v1/feature-service/get-features"
movieid2title = pickle.load(open("movieid2title.p", "rb"))


In [3]:
def get_schema():
    data = json.dumps(
        {
            "params": {
                "feature_service_name": os.environ["TECTON_FEATURE_SERVICE"],
                "join_key_map": {
                    "USER_ID": "1",
                    "MOVIE_ID": "1",
                },
                "workspace_name": "apply-2022-demo",
                "metadata_options": {"include_names": True, "include_data_types": True},
            },
        }
    )
    r = tecton_conn_pool.request("POST", url=endpoint, headers=auth_headers, body=data)
    data = json.loads(r.data.decode("utf-8"))["metadata"]["features"]
    names = ["USER_ID", "MOVIE_ID"] + [f["name"] for f in data]
    names = [n.upper().replace(".", "__") for n in names]
    types = ["string", "string"] + [f["dataType"]["type"] for f in data]
    return names, types


names, types = get_schema()

In [4]:
user_id = '102329'
movie_id= '1'

# Candidate Generation

In [5]:
def generate_candidates(movie_id):
    data = json.dumps(
        {
            "params": {
                "feature_service_name": os.environ["TECTON_NEAREST_NEIGHBOR_SERVICE"],
                "join_key_map": {
                    "MOVIE_ID": movie_id,
                },
                "workspace_name": "apply-2022-demo",
            },
        }
    )
    r = tecton_conn_pool.request("POST", url=get_features_endpoint, headers=auth_headers, body=data)
    candidates = json.loads(r.data.decode("utf-8"))["result"]["features"][0].split(",")[1:]
    return candidates

candidates = generate_candidates(movie_id)
print(candidates)

['3114', '78499', '4886', '6377', '364', '8961', '68954', '4306', '588', '480', '60069', '356', '1270', '50872', '134853', '260', '58559', '4993', '1196', '1210', '59315', '5952', '2571', '6539', '7153', '1198', '1265', '1580', '595', '1682', '33794', '2355', '112852', '79132', '5349', '4896', '91529', '8360', '8368', '5218', '72998', '2762', '152081', '89745', '296', '110', '589', '318', '2716', '1291', '2959', '122886', '1197', '1097', '76093', '3578', '47', '5816', '8636', '59784', '593', '780', '115617', '1136', '122904', '6874', '1240', '40815', '5989', '2028', '88125', '81834', '1036', '97913', '2011', '108932', '79091', '69844', '68157', '919', '2918', '3793', '54001', '44191', '134130', '4022', '109487', '122882', '1907', '4963', '1721', '5618', '8874', '106696', '99114', '1214', '33493', '7438', '7361']


# Filtering

In [6]:
def filter_candidates(user_id, movie_ids):
    data = json.dumps(
        {
            "params": {
                "feature_service_name": os.environ["TECTON_RECENTLY_WATCHED_SERVICE"],
                "join_key_map": {
                    "USER_ID": user_id,
                },
                "workspace_name": "apply-2022-demo",
            },
        }
    )
    r = tecton_conn_pool.request("POST", url=get_features_endpoint, headers=auth_headers, body=data)
    recently_watched = json.loads(r.data.decode("utf-8"))["result"]["features"][0]
    if recently_watched is None:
        return movie_ids
    else:
        print(recently_watched.split(","))
        filtered = list(set(movie_ids) - set(recently_watched.split(",")))
        return filtered
    
filtered_candidates = filter_candidates(user_id, candidates)


# Ranking
![](assets/ranking.png)

In [7]:
def get_feature_vector(user_id, movie_id):
    data = json.dumps(
        {
            "params": {
                "feature_service_name": os.environ["TECTON_FEATURE_SERVICE"],
                "join_key_map": {
                    "USER_ID": user_id,
                    "MOVIE_ID": movie_id,
                },
                "workspace_name": "apply-2022-demo",
            },
        }
    )
    r = tecton_conn_pool.request("POST", url=get_features_endpoint, headers=auth_headers, body=data)
    fv = json.loads(r.data.decode("utf-8"))
    if "result" in fv:
        return [user_id, movie_id] + fv["result"]["features"]
    return None

def get_feature_vectors(user_id, movie_ids):
    return [get_feature_vector(user_id, movie_id) for movie_id in movie_ids]

def get_predictions(df):
    dfs = pickle.dumps(df)
    dfs = base64.b64encode(dfs).decode("utf-8")
    r = prediction_conn_pool.request(
        "POST",
        url="/predict/",
        headers=prediction_headers,
        body=json.dumps({"df": dfs}),
    )
    return r.data.decode("utf-8")

def rank_candidates(user_id, movie_ids):
    fvs = get_feature_vectors(user_id, movie_ids)
    # create feature dataframe
    feature_vectors = [fv for fv in fvs if fv]
    df = pd.DataFrame(feature_vectors, columns=names)

    # Apply correct schema and typing
    for i, col in enumerate(df.columns):
        df[col] = df[col].astype(types[i])

    # Call prediction endpoint
    preds = json.loads(get_predictions(df))["predictions"]

    # Sort by predicted rating
    preds_and_ids = sorted(
        [(p, r) for p, r in zip(preds, list(df.MOVIE_ID.values))],
        key=lambda x: x[0],
        reverse=True,
    )
    return preds_and_ids

ranked = rank_candidates(user_id, filtered_candidates)
titles_and_scores = [(movieid2title[int(id)], p) for p, id in ranked]
print(titles_and_scores)

[('Shawshank Redemption, The (1994)', 4.57875919342041), ('WALL·E (2008)', 4.527594566345215), ('Spirited Away (Sen to Chihiro no kamikakushi) (2001)', 4.523556232452393), ('Groundhog Day (1993)', 4.507867336273193), ('Incredibles, The (2004)', 4.485877990722656), ('Back to the Future (1985)', 4.481118679046631), ('How to Train Your Dragon (2010)', 4.479037284851074), ('Zootopia (2016)', 4.478172302246094), ('Up (2009)', 4.47419548034668), ('Terminator 2: Judgment Day (1991)', 4.467179775238037), ('Aladdin (1992)', 4.464611530303955), ('Monsters, Inc. (2001)', 4.461755752563477), ('Lion King, The (1994)', 4.4545135498046875), ('Guardians of the Galaxy (2014)', 4.453990459442139), ('Princess Bride, The (1987)', 4.452365875244141), ('Catch Me If You Can (2002)', 4.451648712158203), ('Ratatouille (2007)', 4.44854211807251), ('Back to the Future Part II (1989)', 4.448413372039795), ('Forrest Gump (1994)', 4.434703350067139), ('The Martian (2015)', 4.431207656860352), ('Toy Story 3 (2010)',