In [1]:
%load_ext dotenv
%dotenv

import pandas as pd
import os
import csv
import re


In [3]:
# Config

RET = {
    "embedder": "sentence-transformers/multi-qa-MiniLM-L6-cos-v1",  # 384-dim, fast, good for short queries

    # Candidate generation
    "bm25_size": 250,     # number of BM25 candidates pulled from OS
    "top_k": 10,          # final results you return to chat

    # Client-side re-rank: final = alpha * bm25_norm + (1 - alpha) * cosine
    "alpha": 0.65,        # sweep 0.55–0.75 on your eval later

    # Tiny, generic expansions (LLM-free)
    "expand": True,

    # BM25 dis_max buckets (generic + safe)
    "bm25": {
        "tie_breaker": 0.10,
        "boosts": {
            "title_phrase": 12.0,
            "cast_phrase":   8.0,
            "title_best":   10.0,
            "cast_best":     7.0,
            "director":      2.5,
            "listed_in":     9.0,   # genres/audience — biggest ROI on your data
            "country":       4.0,   # e.g., “german series”
            "rating":        2.5,   # e.g., “mature/TV-MA”
            "type":          1.5,   # small nudge
            "desc_phrase":   1.05   # description only as phrase; avoid dominance
        }
    }
}


In [26]:
# mapping & util

from datetime import datetime

_DATE_PATTERNS = [
    "%Y-%m-%d",        # 2021-09-25
    "%B %d, %Y",       # September 25, 2021
    "%b %d, %Y",       # Sep 25, 2021
    "%B %d %Y",        # September 25 2021 (no comma)
    "%b %d %Y",        # Sep 25 2021 (no comma)
]

def to_iso_date(s: str | None) -> str | None:
    if not s:
        return None
    s2 = str(s).strip()
    for fmt in _DATE_PATTERNS:
        try:
            return datetime.strptime(s2, fmt).date().isoformat()
        except Exception:
            pass
    return None

def add_text_copies_mapping(client, index):
    body = {"properties": {
        "listed_in_text": {"type": "text"},
        "country_text":   {"type": "text"},
        "rating_text":    {"type": "text"},
        "type_text":      {"type": "text"},
        "date_added":     {"type": "date"},
        "date_added_raw": {"type": "keyword"},
    }}
    client.indices.put_mapping(index=index, body=body, ignore=400)



In [25]:
# Indexing

import re

def _clean(s):
    if s is None: return None
    s2 = str(s).strip()
    return s2 or None

def _split_csv(s):
    s = _clean(s)
    return [p.strip() for p in s.split(",")] if s else []

def row_normalize(row: dict) -> dict:
    listed = _split_csv(row.get("listed_in"))
    cast_list = _split_csv(row.get("cast"))
    ry = _clean(row.get("release_year"))
    release_year = int(ry) if ry and ry.isdigit() else None
    
    raw_date = _clean(row.get("date_added"))
    iso_date = to_iso_date(raw_date)

    doc = {
        "show_id": _clean(row.get("show_id")),
        "type": _clean(row.get("type")),
        "type_text": _clean(row.get("type")),
        "title": _clean(row.get("title")),
        "director": _clean(row.get("director")),
        "cast": _clean(row.get("cast")),
        "cast_list": cast_list,
        "country": _clean(row.get("country")),
        "country_text": _clean(row.get("country")),
        "release_year": release_year,
        "rating": _clean(row.get("rating")),
        "rating_text": _clean(row.get("rating")),
        "duration": _clean(row.get("duration")),
        "listed_in": listed,
        "listed_in_text": ", ".join(listed) if listed else None,
        "description": _clean(row.get("description")),
        "date_added_raw": raw_date,     
        # only set date_added if we could parse it
        **({"date_added": iso_date} if iso_date else {}),
    }
    # drop empties
    doc = {k: v for k, v in doc.items() if v not in (None, "", [], {})}
    doc["_id"] = doc.get("show_id") or (re.sub(r"\s+", "", (doc.get("title") or "").lower()) + "_gen")
    return doc

def text_for_embedding(d: dict) -> str:
    # Add light categorical cues so semantics “hear” audience/country/rating
    parts = []
    for k in ("title","description","director","type","country","rating"):
        v = d.get(k); parts.append(v) if v else None
    if d.get("cast_list"): parts.append(", ".join(d["cast_list"]))
    if d.get("listed_in"): parts.append(", ".join(d["listed_in"]))
    return " | ".join(parts)


In [27]:
# Candidate generation (BM25-only)

from functools import lru_cache

@lru_cache(maxsize=1)
def get_embed_model(name: str):
    from sentence_transformers import SentenceTransformer
    return SentenceTransformer(name)  # used later in re-rank

def _normalize_expand(q: str, enable=True) -> str:
    qn = q.lower().strip().replace("sci-fi","sci fi").replace("&"," and ")
    if not enable: return qn
    adds = []
    if "kids" in qn or "family" in qn: adds += ["children","preschool","toddler","family"]
    if any(t in qn for t in ("romantic","romance","romcom")): adds += ["love","relationships","romcom"]
    if any(t in qn for t in ("mature","adult")): adds += ["tv ma","r","mature"]
    if any(t in qn for t in ("series"," tv "," tv-"," tv_"," show")): adds += ["tv show","series"]
    for hint,country in [("german","germany"),("korean","south korea"),("japanese","japan"),
                         ("french","france"),("spanish","spain"),("italian","italy")]:
        if hint in qn: adds.append(country)
    return qn + (" " + " ".join(adds) if adds else "")

def bm25_query(qn: str, boosts: dict, tie: float) -> dict:
    return {
      "dis_max": {
        "tie_breaker": tie,
        "queries": [
          {"multi_match": {"query": qn, "type": "phrase",
                           "fields": [f"title^{boosts['title_phrase']}", f"cast^{boosts['cast_phrase']}"], "slop": 1}},
          {"multi_match": {"query": qn, "type": "best_fields",
                           "fields": [f"title^{boosts['title_best']}", f"cast^{boosts['cast_best']}", f"director^{boosts['director']}"],
                           "operator": "AND", "minimum_should_match": "2<-25%"}},
          {"multi_match": {"query": qn, "type": "best_fields",
                           "fields": [f"listed_in_text^{boosts['listed_in']}"], "operator": "OR"}},
          {"multi_match": {"query": qn, "type": "best_fields",
                           "fields": [f"country_text^{boosts['country']}", f"rating_text^{boosts['rating']}", f"type_text^{boosts['type']}"]}},
          {"match_bool_prefix": {"title": {"query": qn, "boost": 2.0}}},
          {"match_bool_prefix": {"cast":  {"query": qn, "boost": 1.5}}},
          {"match_phrase": {"description": {"query": qn, "slop": 2, "boost": boosts["desc_phrase"]}}}
        ]
      }
    }

def bm25_candidates(client, index, q, *, cfg=RET):
    qn = _normalize_expand(q, cfg["expand"])
    bq = bm25_query(qn, cfg["bm25"]["boosts"], cfg["bm25"]["tie_breaker"])
    body = {
        "size": cfg["bm25_size"],
        "_source": ["show_id","title","type","release_year","vector"],  # include vector for re-rank
        "query": bq
    }
    return client.search(index=index, body=body)


In [28]:
# Client-side semantic re-rank


import math

def _minmax(scores):
    if not scores: return ([], 0.0, 1.0)
    lo, hi = min(scores), max(scores)
    if math.isclose(hi, lo): return ([1.0]*len(scores), lo, hi)
    return ([(s - lo)/(hi - lo) for s in scores], lo, hi)

def rerank_with_vectors(res, qvec, *, alpha=0.65, top_k=10):
    hits = res.get("hits",{}).get("hits",[])
    if not hits: return []
    bm25_scores = [h.get("_score", 0.0) for h in hits]
    bm25_norm, _, _ = _minmax(bm25_scores)

    ranked = []
    for h, b in zip(hits, bm25_norm):
        v = h.get("_source",{}).get("vector")
        if not v:
            final = alpha * b
        else:
            # if you indexed normalized vectors (recommended), dot == cosine
            cos = sum(a*b2 for a,b2 in zip(qvec, v))
            final = alpha * b + (1.0 - alpha) * cos
        ranked.append((final, h))

    ranked.sort(key=lambda x: x[0], reverse=True)
    return [h for _, h in ranked[:top_k]]

def retrieve(client, index, q, *, cfg=RET, model=None):
    res = bm25_candidates(client, index, q, cfg=cfg)
    m = model or get_embed_model(cfg["embedder"])
    qvec = m.encode(q, normalize_embeddings=True).tolist()
    return rerank_with_vectors(res, qvec, alpha=cfg["alpha"], top_k=cfg["top_k"])


In [29]:
# Evaluate 

def ids_from_hits(hits): return [h["_source"].get("show_id","") for h in hits]

def hit_rate_at_k(ids, gold, k=10): return 1.0 if gold in ids[:k] else 0.0
def mrr_at_k(ids, gold, k=10):
    for i,x in enumerate(ids[:k], 1):
        if x == gold: return 1.0 / i
    return 0.0

def evaluate(client, index, qid_to_queries: dict, *, cfg=RET):
    total_hr = total_mrr = n = 0
    for gold, qs in qid_to_queries.items():
        for q in qs:
            hits = retrieve(client, index, q, cfg=cfg)
            ids  = ids_from_hits(hits)
            total_hr  += hit_rate_at_k(ids, gold, cfg["top_k"])
            total_mrr += mrr_at_k(ids, gold, cfg["top_k"])
            n += 1
    return {"hit_rate": total_hr / max(1,n), "mrr": total_mrr / max(1,n)}



### Data Preparation


In [30]:

# Internal Catalog Data Load 
csv_path = os.getcwd() + '/../data/netflix_titles.csv'

ascii_only = re.compile(r'^[\x00-\x7F]+$')

def is_ascii(s):
    return bool(s) and ascii_only.match(s)

with open(csv_path, "r", encoding="utf-8", errors="ignore") as f:
    reader = csv.DictReader(f)
    data = [
        row for row in reader
        if is_ascii(row.get("title", "").strip()) and is_ascii(row.get("description", "").strip())
    ]

print(f"Considering {len(data)} ASCII-only rows")

Considering 7370 ASCII-only rows


In [31]:
import json
ground_truth = []

with open('results.json', 'r') as f:
    results = json.load(f)


In [32]:
# get two records from dict
sample_results = {k: results[k] for k in list(results)[:2]}
sample_results

{'s1426': ['documentary',
  'chess movie',
  'story about a chess prodigy',
  "making of The Queen's Gambit documentary",
  'behind the scenes of a chess film'],
 's7321': ['Goth',
  'independent movie',
  'film about faith and family',
  "comedy drama about a nun's journey",
  'movies featuring characters returning home from war']}

In [51]:
import json
ground_truth = []

with open('results.json', 'r') as f:
    results = json.load(f)


for s_id, qs in results.items():
    for q in qs:
        ground_truth.append({'query': q, 'doc_id': s_id})

### OpenSearch

In [33]:
from __future__ import annotations
import os, re
from dataclasses import dataclass
from datetime import datetime
from itertools import islice
from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple

from opensearchpy import OpenSearch, RequestsHttpConnection
from opensearchpy.helpers import streaming_bulk

# Lazy import for speed if embed=False
try:
    from sentence_transformers import SentenceTransformer  # type: ignore
except Exception:
    SentenceTransformer = None  # noqa: N816

In [34]:
# OpenSearch Config 

@dataclass(frozen=True)
class Cfg:
    embed_model: str = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
    embed_batch: int = int(os.getenv("EMBED_BATCH", "32"))
    index: str = os.getenv("OS_INDEX", "netflix_assets")
    vector_dim: int = int(os.getenv("OS_VECTOR_DIM", "384"))
    url: str = os.getenv("OPENSEARCH_URL", "https://localhost:9200")
    user: str = os.getenv("OS_USER", "admin")
    pwd: str = os.getenv("OS_PASS", "admin")
    verify: bool = os.getenv("OS_VERIFY", "true").lower() in ("true", "1", "yes")
    timeout: int = int(os.getenv("OS_TIMEOUT", "60"))

In [35]:
# -------------------- OpenSearch + Embedding --------------------

def make_client(cfg: Cfg) -> OpenSearch:
    if cfg.url.startswith("https://"):
        return OpenSearch(
            cfg.url,
            http_auth=(cfg.user, cfg.pwd),
            verify_certs=cfg.verify,
            ssl_assert_hostname=cfg.verify,
            ssl_show_warn=cfg.verify,
            http_compress=True,
            connection_class=RequestsHttpConnection,
            timeout=cfg.timeout, max_retries=3, retry_on_timeout=True,
        )
    return OpenSearch(cfg.url, http_compress=True, timeout=cfg.timeout, max_retries=3, retry_on_timeout=True)

_embedder_cache: Dict[str, Any] = {}

def get_embedder(cfg: Cfg):
    if SentenceTransformer is None:
        raise RuntimeError("sentence-transformers not installed, but embed=True requested.")
    key = cfg.embed_model
    emb = _embedder_cache.get(key)
    if emb is None:
        model = SentenceTransformer(cfg.embed_model)
        # quick dim probe (no hard fail in prod unless you want fail-fast)
        try:
            v = model.encode(["dim"], normalize_embeddings=True)
            dim = v.shape[1] if hasattr(v, "shape") else len(v[0])
            if dim != cfg.vector_dim:
                raise RuntimeError(f"VECTOR_DIM={cfg.vector_dim} mismatches model dim={dim}")
        except Exception:
            pass
        _embedder_cache[key] = model
        emb = model
    return emb

def ensure_index(client: OpenSearch, cfg: Cfg) -> None:
    try:
        exists = client.indices.exists(index=cfg.index)
    except TypeError:  # old clients sig
        exists = client.indices.exists(cfg.index)
    if not (exists if isinstance(exists, bool) else exists):
        client.indices.create(index=cfg.index, body=index_mapping(cfg.vector_dim), ignore=400)
        print(f"created index {cfg.index}")
    else:
        print(f"using index {cfg.index}")
    client.indices.refresh(index=cfg.index)


# -------------------- Action stream --------------------

def actions_from_rows(
    rows: Iterable[Dict[str, str]],
    cfg: Cfg,
    embed: bool = True,
    embedder=None,
) -> Iterator[Dict[str, Any]]:
    """
    Yields bulk index actions. When embed=True, encodes in efficient batches.
    """
    if not embed:
        for r in rows:
            d = row_normalize(r)
            _id = d.get("_id")
            if not _id:
                continue
            src = {k: v for k, v in d.items() if k != "_id"}
            yield {"_op_type": "index", "_index": cfg.index, "_id": _id, **src}
        return

    model = embedder or get_embedder(cfg)
    norm = (row_normalize(r) for r in rows)

    for chunk in _batched(norm, cfg.embed_batch):
        docs = [d for d in chunk if d.get("_id")]
        if not docs:
            continue
        texts = [text_for_embedding(d) for d in docs]
        vecs = model.encode(
            texts,
            normalize_embeddings=True,
            batch_size=cfg.embed_batch,
            convert_to_numpy=True,
            show_progress_bar=False,
        )
        # Avoid numpy -> list per element call overhead by one pass
        for d, v in zip(docs, vecs.tolist() if hasattr(vecs, "tolist") else vecs):
            _id = d["_id"]
            src = {k: v2 for k, v2 in d.items() if k != "_id"}
            src["vector"] = v if isinstance(v, list) else list(v)
            yield {"_op_type": "index", "_index": cfg.index, "_id": _id, **src}

In [36]:
# Utils

def _batched(it: Iterable[Any], n: int) -> Iterator[List[Any]]:
    it = iter(it)
    while True:
        chunk = list(islice(it, n))
        if not chunk: return
        yield chunk

In [37]:
cfg = Cfg()
os_client = make_client(cfg)
ensure_index(os_client, cfg)

using index netflix_assets


In [38]:
from opensearchpy.helpers import streaming_bulk

success = 0
for ok, item in streaming_bulk(
    client=os_client,
    actions=actions_from_rows(data, cfg, embed=True),
    chunk_size=cfg.embed_batch,          # good default: your embed batch
    max_retries=3,
    raise_on_error=False,                 # don't raise on first bad doc
    request_timeout=cfg.timeout,
):
    if ok:
        success += 1
    else:
        # optional: log the failed item
        print("FAIL:", item)
        pass

os_client.indices.refresh(index=cfg.index)
print(f"Bulk indexed {success} documents into {cfg.index}")


Bulk indexed 7370 documents into netflix_assets


In [47]:
import random
sampled_results = random.sample(list(results.items()), 100)


In [48]:
evaluate(os_client, cfg.index, dict(sampled_results), cfg=RET)


{'hit_rate': 0.192, 'mrr': 0.1148714285714286}

In [54]:
# Internal metric
from tqdm.auto import tqdm

def hit_rate(relevance_total):
    cnt = 0

    for line in relevance_total:
        if True in line:
            cnt = cnt + 1

    return cnt / len(relevance_total)

def mrr(relevance_total):
    total_score = 0.0

    for line in relevance_total:
        for rank in range(len(line)):
            if line[rank] == True:
                total_score = total_score + 1 / (rank + 1)

    return total_score / len(relevance_total)


# Evaluate

def evaluate(client, index, qid_to_queries: dict, *, cfg=RET):
    total_hr = total_mrr = n = 0
    for gold, qs in qid_to_queries.items():
        for q in qs:
            hits = retrieve(client, index, q, cfg=cfg)
            ids  = ids_from_hits(hits)
            total_hr  += hit_rate_at_k(ids, gold, cfg["top_k"])
            total_mrr += mrr_at_k(ids, gold, cfg["top_k"])
            n += 1
    return {"hit_rate": total_hr / max(1,n), "mrr": total_mrr / max(1,n)}

def calculate_metrics(ground_truth):
    relevance_total = []
    for i in tqdm(ground_truth):
        query = i['query']
        doc_id = i['doc_id']
        hits = retrieve(os_client, cfg.index, query, cfg=RET)
        s_ids  = ids_from_hits(hits)
        relevance = [doc_id == s_id for s_id in s_ids]
        relevance_total.append(relevance)

    mrr_score = mrr(relevance_total)
    hit_rate_score = hit_rate(relevance_total)

    return {
        "mrr": mrr_score,
        "hit_rate": hit_rate_score
    }

In [55]:
calculate_metrics(ground_truth)


  0%|          | 0/5000 [00:00<?, ?it/s]

{'mrr': 0.11047309523809525, 'hit_rate': 0.19}