# Set up

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import os
import sys

import redis
from dotenv import load_dotenv
from loguru import logger
from pydantic import BaseModel
from tqdm.auto import tqdm

load_dotenv()

sys.path.insert(0, "..")

# Controller

In [3]:
class Args(BaseModel):
    testing: bool = False
    run_name: str = "000-first-attempt"
    notebook_persist_dp: str = None
    random_seed: int = 41

    redis_host: str = "localhost"
    redis_port: int = 6379
    redis_key_prefix: str = "output:i2i:"

    batch_recs_fp: str = "data/000-first-attempt/batch_recs.jsonl"

    def init(self):
        self.notebook_persist_dp = os.path.abspath(f"data/{self.run_name}")
        os.makedirs(self.notebook_persist_dp, exist_ok=True)

        if redis_host := os.getenv("REDIS_HOST"):
            self.redis_host = redis_host
            self.redis_port = os.getenv("REDIS_PORT", self.redis_port)

        return self


args = Args().init()

print(args.model_dump_json(indent=2))

{
  "testing": false,
  "run_name": "000-first-attempt",
  "notebook_persist_dp": "/mnt/d/projects/recsys/notebooks/data/000-first-attempt",
  "random_seed": 41,
  "redis_host": "localhost",
  "redis_port": "6379",
  "redis_key_prefix": "output:i2i:",
  "batch_recs_fp": "data/000-first-attempt/batch_recs.jsonl"
}


  PydanticSerializationUnexpectedValue(Expected `int` - serialized value may not be as expected [field_name='redis_port', input_value='6379', input_type=str])
  return self.__pydantic_serializer__.to_json(


# Load batch recs into Redis

In [4]:
r = redis.Redis(host=args.redis_host, port=args.redis_port, db=0, decode_responses=True)
assert (
    r.ping()
), f"Redis at {args.redis_host}:{args.port} is not running, please make sure you have started the Redis docker service"

In [12]:
def store_recommendations(fp: str):
    with open(fp, "r") as f:
        for line in tqdm(f):
            rec_data = json.loads(line)
            target_item = rec_data["target_item"]
            key = args.redis_key_prefix + target_item
            r.set(
                key,
                json.dumps(
                    {
                        "rec_item_ids": rec_data["rec_item_ids"],
                        "rec_scores": rec_data["rec_scores"],
                    }
                )
            )

def get_recommendations(target_item: str):
    key = args.redis_key_prefix + target_item
    rec_data = r.get(key)
    if rec_data:
        return json.loads(rec_data)
    return None
    
def get_example_keys(count=5):
    keys = r.scan_iter(match=args.redis_key_prefix + "*", count=count)
    output = []
    for i, key in enumerate(keys, 1):
        output.append(key)
        if i >= count:
            return output

In [13]:
logger.info(f"Loading batch recs output from {args.batch_recs_fp}...")
store_recommendations(args.batch_recs_fp)

[32m2025-11-13 02:11:11.132[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m1[0m - [1mLoading batch recs output from data/000-first-attempt/batch_recs.jsonl...[0m


0it [00:00, ?it/s]

In [14]:
get_recommendations(get_example_keys()[0])