<div align="center">

# **DELIVERY 2**
## **Indexing and Evaluation**

</div>

---

# **PART 1: Indexing**

### **STEP 1 — Build inverted index:**

#### **Main Code**

In [1]:
import json
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Set, Any, Iterable
from pathlib import Path
import sys

NOTEBOOK_DIR = Path().resolve()
REPO_ROOT = NOTEBOOK_DIR.parents[1] if NOTEBOOK_DIR.name in {"part_1", "part_2"} else NOTEBOOK_DIR
sys.path.append(str(REPO_ROOT / "project_progress"))
from utils.preprocessing import preprocess_text_field



# Path
NOTEBOOK_DIR = Path().resolve()
REPO_ROOT = NOTEBOOK_DIR.parents[1]          
DATA_DIR = REPO_ROOT / "data"
INPUT = DATA_DIR / "fashion_products_dataset_enriched.json"

INDEX_DIR = DATA_DIR / "index"
INDEX_DIR.mkdir(parents=True, exist_ok=True)

INDEX_FILE = INDEX_DIR / "boolean_inverted_index.json"
DOCMAP_FILE = INDEX_DIR / "docid_pid_map.json"
FIELDS_FILE = INDEX_DIR / "indexed_fields.json"

print(f"Reading enriched dataset: {INPUT}")
print(f"Index will be saved in:   {INDEX_DIR}")


if not INPUT.exists():
    raise FileNotFoundError(f"Enriched dataset not found: {INPUT}")
docs: List[Dict[str, Any]] = json.loads(INPUT.read_text(encoding="utf-8"))
print(f"Loaded {len(docs)} docs")


INDEXED_TEXT_FIELDS = [
    "title_clean",
    "description_clean",
    "metadata_clean",   
]


# doc_id is an integer, stable order = index in list
docid_to_pid: Dict[int, str] = {}
pid_to_docid: Dict[str, int] = {}

for i, r in enumerate(docs):
    pid = r.get("pid")
    if not pid:
        pid = r.get("_id", f"missing_pid_{i}")
    docid_to_pid[i] = pid
    pid_to_docid[pid] = i

def _doc_tokens(record: Dict[str, Any], fields: Iterable[str]) -> List[str]:
    toks: List[str] = []
    for f in fields:
        val = record.get(f)
        if not val:
            continue
        # We already have cleaned strings; just split.
        toks.extend(str(val).split())
    return toks


# Build inverted index 
vocab: Dict[str, Set[int]] = defaultdict(set)

for doc_id, rec in enumerate(docs):
    tokens = _doc_tokens(rec, INDEXED_TEXT_FIELDS)
    if not tokens:
        continue
    # Use unique terms per doc for Boolean presence posting
    for term in set(tokens):
        vocab[term].add(doc_id)

# Convert sets to sorted lists for compactness and efficient AND intersections
inverted_index: Dict[str, List[int]] = {t: sorted(list(s)) for t, s in vocab.items()}
print(f"Vocabulary size: {len(inverted_index):,}")


INDEX_FILE.write_text(json.dumps(inverted_index), encoding="utf-8")
DOCMAP_FILE.write_text(json.dumps({"docid_to_pid": docid_to_pid}, ensure_ascii=False), encoding="utf-8")
FIELDS_FILE.write_text(json.dumps({"indexed_fields": INDEXED_TEXT_FIELDS}, ensure_ascii=False, indent=2), encoding="utf-8")

print(f"Saved inverted index to: {INDEX_FILE}")
print(f"Saved doc map         to: {DOCMAP_FILE}")
print(f"Saved fields          to: {FIELDS_FILE}")


REQUIRED_OUTPUT_FIELDS = [
    "pid", "title", "description", "brand", "category", "sub_category",
    "product_details", "seller", "out_of_stock", "selling_price", "discount",
    "actual_price", "average_rating", "url"
]

def _query_tokens(q: str) -> List[str]:
    # Use the same normalization and stemming pipeline as Step 1
    proc = preprocess_text_field(q or "")
    return proc["tokens"]

def _intersect_sorted(a: List[int], b: List[int]) -> List[int]:
    """Intersect two sorted posting lists."""
    i=j=0
    out: List[int] = []
    while i < len(a) and j < len(b):
        if a[i] == b[j]:
            out.append(a[i])
            i+=1; j+=1
        elif a[i] < b[j]:
            i+=1
        else:
            j+=1
    return out

def search_and(query: str, fields: List[str] = None, k: int = 20) -> List[Dict[str, Any]]:
    """
    Conjunctive (AND) Boolean search.
    Every returned doc must contain ALL query terms (after preprocessing).
    Returns up to k full records with the required fields (when present).
    """
    _ = fields  # kept for future extension; current index already built over INDEXED_TEXT_FIELDS
    q_terms = _query_tokens(query)
    if not q_terms:
        return []

    # Load postings lists; if any term not in vocab -> empty result
    postings_lists: List[List[int]] = []
    for t in q_terms:
        p = inverted_index.get(t)
        if not p:
            return []
        postings_lists.append(p)

    # Intersect from shortest to longest for speed
    postings_lists.sort(key=len)
    result_ids = postings_lists[0]
    for pl in postings_lists[1:]:
        result_ids = _intersect_sorted(result_ids, pl)
        if not result_ids:
            break

    # Map to records and keep only required output fields (when present)
    out: List[Dict[str, Any]] = []
    for did in result_ids[:k]:
        rec = docs[did]
        # Build a thin view with required fields (include only those present)
        view = {f: rec.get(f) for f in REQUIRED_OUTPUT_FIELDS if f in rec}
        # Always include pid
        if "pid" not in view:
            view["pid"] = rec.get("pid") or docid_to_pid.get(did)
        out.append(view)
    return out

Reading enriched dataset: C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\fashion_products_dataset_enriched.json
Index will be saved in:   C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\index
Loaded 28080 docs
Vocabulary size: 9,048
Saved inverted index to: C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\index\boolean_inverted_index.json
Saved doc map         to: C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\index\docid_pid_map.json
Saved fields          to: C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\index\indexed_fields.json


### **STEP 2 — Propose test queries:**

In [2]:
import random

def df(term: str) -> int:
    """Document frequency of a term (0 if absent)."""
    return len(inverted_index.get(term, []))

def in_vocab(token: str) -> bool:
    return token in inverted_index

def stem_phrase(phrase: str) -> list[str]:
    return preprocess_text_field(phrase)["tokens"]

def phrase_ok(phrase: str) -> bool:
    """All tokens exist in vocab AND the AND-query returns at least one result."""
    toks = stem_phrase(phrase)
    if not toks or not all(in_vocab(t) for t in toks):
        return False
    return len(search_and(phrase, k=1)) > 0

def term_popularity_score(tokens: list[str]) -> int:
    """Sum of dfs for quick 'popularity' proxy."""
    return sum(df(t) for t in tokens)

# Candidate lexicons (raw phrases)
gender_phrases  = ["men", "women"]
type_phrases    = [
    "jeans", "shirt", "t shirt", "hoodie", "sweatshirt",
    "track pants", "kurta", "dress", "jacket"
]
color_phrases   = ["black", "blue", "white", "grey", "red", "green", "pink"]
material_phrases= ["cotton", "polyester", "denim", "linen", "silk"]
fit_phrases     = ["slim", "skinny", "regular", "straight", "high waist"]
style_phrases   = ["printed", "solid", "striped", "floral"]

# Keep only phrases whose tokens exist in vocab (post-stemming)
def filter_vocab(phrases):
    good = []
    for p in phrases:
        toks = stem_phrase(p)
        if toks and all(in_vocab(t) for t in toks):
            good.append((p, toks, term_popularity_score(toks)))
    # Sort by popularity descending (PRP)
    return sorted(good, key=lambda x: x[2], reverse=True)

gender_ok   = filter_vocab(gender_phrases)
types_ok    = filter_vocab(type_phrases)
colors_ok   = filter_vocab(color_phrases)
materials_ok= filter_vocab(material_phrases)
fits_ok     = filter_vocab(fit_phrases)
styles_ok   = filter_vocab(style_phrases)

# Compose candidate query templates: (gender) + (type) + (attribute sets)
templates = [
    ["{g}", "{m}", "{t}", "{c}"],                 # gender + material + type + color
    ["{g}", "{t}", "{f}", "{c}"],                 # gender + type + fit + color
    ["{g}", "full sleeve", "{t}", "{m}"],         # gender + sleeve attr + type + material
    ["{g}", "{t}", "{s}", "{c}"],                 # gender + type + style + color
    ["{g}", "high waist", "{t}", "{c}"],          # gender + high waist + type + color
]

def pick(pop_list, k=1):
    return [p[0] for p in pop_list[:k]] if pop_list else []

# Generate diverse, popular queries that actually return hits
proposed = []
attempts = 0
seen_main_types = set()

while len(proposed) < 5 and attempts < 200:
    attempts += 1
    tpl = random.choice(templates)

    g = pick(gender_ok, 1) or ["women"]
    t = pick(types_ok, 1) or ["jeans"]
    m = pick(materials_ok, 1) or ["cotton"]
    c = pick(colors_ok, 1) or ["blue"]
    f = pick(fits_ok, 1) or ["slim"]
    s = pick(styles_ok, 1) or ["printed"]

    phrase = " ".join(
        x.format(g=g[0], t=t[0], m=m[0], c=c[0], f=f[0], s=s[0])
        for x in tpl
    )
    phrase = " ".join(phrase.split())  # clean double spaces

    # Require the AND query to return hits and encourage diversity by not repeating the same main type too much.
    main_type = t[0]
    if phrase_ok(phrase):
        if sum(1 for q in proposed if main_type in q) < 2:
            proposed.append(phrase)

# Fallbacks (just in case)
fallbacks = [
    "women cotton kurta straight",
    "men slim fit formal shirt",
    "women high waist blue jeans",
    "men running shoes black",
    "women printed dress floral"
]
for fb in fallbacks:
    if len(proposed) >= 5:
        break
    if fb not in proposed and phrase_ok(fb):
        proposed.append(fb)

# Deduplicate and trim to 5
seen = set()
unique_proposed = []
for q in proposed:
    if q not in seen:
        unique_proposed.append(q)
        seen.add(q)
proposed = unique_proposed[:5]

print("=== Proposed Test Queries (data-driven) ===")
for i, q in enumerate(proposed, 1):
    hits = len(search_and(q, k=50))
    toks = stem_phrase(q)
    score = term_popularity_score(toks)
    print(f"{i}. {q}  | tokens={toks} | DF-score={score} | matches≈{hits}")

# Store for later evaluation/report
TEST_QUERIES_FILE = DATA_DIR / "index" / "proposed_test_queries.json"
TEST_QUERIES_FILE.write_text(json.dumps({"queries": proposed}, indent=2), encoding="utf-8")
print(f"\nSaved queries to: {TEST_QUERIES_FILE}")

=== Proposed Test Queries (data-driven) ===
1. men shirt solid blue  | tokens=['men', 'shirt', 'solid', 'blue'] | DF-score=50331 | matches≈50
2. men high waist shirt blue  | tokens=['men', 'high', 'waist', 'shirt', 'blue'] | DF-score=40797 | matches≈1
3. women cotton kurta straight  | tokens=['women', 'cotton', 'kurta', 'straight'] | DF-score=35775 | matches≈50
4. men slim fit formal shirt  | tokens=['men', 'slim', 'fit', 'formal', 'shirt'] | DF-score=57466 | matches≈50
5. men running shoes black  | tokens=['men', 'run', 'shoe', 'black'] | DF-score=21967 | matches≈28

Saved queries to: C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\index\proposed_test_queries.json


### **STEP 3 — Ranking our results:**

#### **Main Code**

In [3]:
import math
from typing import Tuple

# Configuration
INDEXED_TEXT_FIELDS = ["title_clean", "description_clean", "metadata_clean"]
FIELD_WEIGHTS: Dict[str, float] = {
    "title_clean": 2.0,     
    "description_clean": 1.0,  
    "metadata_clean": 0.7,     
}

# Utilities 
def _tokens_from_fields(record: Dict[str, Any], fields: Iterable[str]) -> Dict[str, Dict[str, int]]:
    """
    Return per-field raw term frequencies:
        { field_name : { term : count_in_that_field } }
    """
    per_field_counts: Dict[str, Dict[str, int]] = {}
    for f in fields:
        txt = record.get(f)
        if not txt:
            continue
        counts: Dict[str, int] = defaultdict(int)
        for t in str(txt).split():
            counts[t] += 1
        if counts:
            per_field_counts[f] = dict(counts)
    return per_field_counts

def _tf_log2(freq: float) -> float:
    """1 + log2(freq) if freq>0 else 0."""
    if freq <= 0:
        return 0.0
    return 1.0 + math.log(freq, 2)

def _idf_log2(df_i: int, N: int) -> float:
    """idf = log2(N / df_i); assumes df_i >= 1."""
    if df_i <= 0 or N <= 0:
        return 0.0
    return math.log(N / df_i, 2)

# Build TF postings & df
N = len(docs)

# term -> { doc_id : (field-weighted raw frequency) }
tf_postings: Dict[str, Dict[int, float]] = defaultdict(dict)
# term -> document frequency
df_counts: Dict[str, int] = defaultdict(int)

for doc_id, rec in enumerate(docs):
    per_field = _tokens_from_fields(rec, INDEXED_TEXT_FIELDS)

    term_freq_weighted: Dict[str, float] = defaultdict(float)
    for field_name, counts in per_field.items():
        w_f = FIELD_WEIGHTS.get(field_name, 1.0)
        for term, f_ij_f in counts.items():
            term_freq_weighted[term] += w_f * f_ij_f

    for term, f_ij in term_freq_weighted.items():
        tf_postings[term][doc_id] = f_ij

# df_i = number of docs where term appears
for term, posting in tf_postings.items():
    df_counts[term] = len(posting)

# Precompute document norms
doc_norms: List[float] = [0.0] * N
for term, posting in tf_postings.items():
    idf_i = _idf_log2(df_counts[term], N)
    if idf_i == 0.0:
        continue
    for d_id, f_ij in posting.items():
        w_dt = _tf_log2(f_ij) * idf_i
        if w_dt != 0.0:
            doc_norms[d_id] += w_dt * w_dt

doc_norms = [math.sqrt(v) if v > 0 else 0.0 for v in doc_norms]

# Ranked search
def search_tfidf(query: str, k: int = 20) -> List[Dict[str, Any]]:
    """
    Rank documents by cosine similarity with TF-IDF (base-2 logs, no smoothing).
    Returns top-k with 'score' plus the required output fields.
    """
    q_proc = preprocess_text_field(query or "")
    q_terms = q_proc["tokens"]
    if not q_terms:
        return []

    # query term frequencies
    q_tf: Dict[str, int] = defaultdict(int)
    for t in q_terms:
        q_tf[t] += 1

    # build query vector
    q_weights: Dict[str, float] = {}
    q_norm_sq = 0.0
    for t, f_q in q_tf.items():
        df_i = df_counts.get(t, 0)
        if df_i <= 0:
            continue  # unseen term
        w_t = _tf_log2(f_q) * _idf_log2(df_i, N)
        if w_t == 0.0:
            continue
        q_weights[t] = w_t
        q_norm_sq += w_t * w_t

    q_norm = math.sqrt(q_norm_sq) if q_norm_sq > 0 else 0.0
    if q_norm == 0.0:
        return []

    # sparse dot product over postings of query terms
    scores: Dict[int, float] = defaultdict(float)
    for t, w_t in q_weights.items():
        posting = tf_postings.get(t)
        if not posting:
            continue
        idf_i = _idf_log2(df_counts[t], N)
        if idf_i == 0.0:
            continue
        for d_id, f_ij in posting.items():
            w_dt = _tf_log2(f_ij) * idf_i
            if w_dt != 0.0:
                scores[d_id] += w_t * w_dt

    # cosine normalization and rank
    ranked: List[Tuple[int, float]] = []
    for d_id, dot in scores.items():
        denom = doc_norms[d_id] * q_norm
        if denom > 0:
            ranked.append((d_id, dot / denom))
    ranked.sort(key=lambda x: x[1], reverse=True)

    results: List[Dict[str, Any]] = []
    for d_id, sc in ranked[:k]:
        rec = docs[d_id]
        view = {f: rec.get(f) for f in REQUIRED_OUTPUT_FIELDS if f in rec}
        if "pid" not in view:
            view["pid"] = rec.get("pid")
        view["score"] = float(sc)
        results.append(view)
    return results

# AND-filtered TF-IDF
def search_tfidf_and(query: str, k: int = 20) -> List[Dict[str, Any]]:
    """
    Conjunctive AND filter first (Boolean), then TF-IDF rank within the survivors.
    Helpful if your teacher wants AND semantics even for ranking.
    """
    # Candidate set via Boolean AND
    cand = search_and(query, k=10_000)
    if not cand:
        return []

    cand_pids = {r["pid"] for r in cand if r.get("pid")}
    cand_ids = {i for i, r in enumerate(docs) if r.get("pid") in cand_pids}

    # Build query vector (same as search_tfidf)
    q_proc = preprocess_text_field(query or "")
    q_terms = q_proc["tokens"]
    if not q_terms:
        return []

    q_tf: Dict[str, int] = defaultdict(int)
    for t in q_terms:
        q_tf[t] += 1

    q_weights: Dict[str, float] = {}
    q_norm_sq = 0.0
    for t, f_q in q_tf.items():
        df_i = df_counts.get(t, 0)
        if df_i <= 0:
            continue
        w_t = _tf_log2(f_q) * _idf_log2(df_i, N)
        if w_t == 0.0:
            continue
        q_weights[t] = w_t
        q_norm_sq += w_t * w_t

    q_norm = math.sqrt(q_norm_sq) if q_norm_sq > 0 else 0.0
    if q_norm == 0.0:
        return []

    scores: Dict[int, float] = defaultdict(float)
    for t, w_t in q_weights.items():
        posting = tf_postings.get(t)
        if not posting:
            continue
        idf_i = _idf_log2(df_counts[t], N)
        if idf_i == 0.0:
            continue
        for d_id, f_ij in posting.items():
            if d_id not in cand_ids:
                continue
            w_dt = _tf_log2(f_ij) * idf_i
            if w_dt != 0.0:
                scores[d_id] += w_t * w_dt

    ranked: List[Tuple[int, float]] = []
    for d_id, dot in scores.items():
        denom = doc_norms[d_id] * q_norm
        if denom > 0:
            ranked.append((d_id, dot / denom))
    ranked.sort(key=lambda x: x[1], reverse=True)

    results: List[Dict[str, Any]] = []
    for d_id, sc in ranked[:k]:
        rec = docs[d_id]
        view = {f: rec.get(f) for f in REQUIRED_OUTPUT_FIELDS if f in rec}
        if "pid" not in view:
            view["pid"] = rec.get("pid")
        view["score"] = float(sc)
        results.append(view)
    return results

# Persist ranked results
def save_ranked_results(out_path: Path, queries: Dict[str, List[str]], use_and_filter: bool = False, k: int = 20) -> Path:
    """
    Save ranked results for groups of queries.
    queries = {"provided": [q1, q2, ...], "proposed": [q3, ...]}
    """
    out = {"provided_queries": {}, "proposed_queries": {}}
    ranker = search_tfidf_and if use_and_filter else search_tfidf

    for group, qlist in queries.items():
        for q in qlist:
            out_key = "provided_queries" if group == "provided" else "proposed_queries"
            out[out_key][q] = ranker(q, k=k)

    out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
    return out_path

#### **Testing**

In [4]:
# Quick demo on course queries
for q in ["women full sleeve sweatshirt cotton", "men slim jeans blue"]:
    top = search_tfidf(q, k=3)
    print(f"\nTF-IDF (log2) top for: {q!r}")
    for r in top:
        print(f"  {r['score']:.4f} | {r.get('pid')} | {(r.get('title') or '')[:80]}")

# Save ranked results for report/repro
RANKED_OUT = (DATA_DIR / "index" / "ranked_results.json")
queries_for_report = {
    "provided": [
        "women full sleeve sweatshirt cotton",
        "men slim jeans blue",
    ],
    # Optionally load your proposed queries file if it exists
}
pq_file = DATA_DIR / "index" / "proposed_test_queries.json"
if pq_file.exists():
    try:
        queries_for_report["proposed"] = json.loads(pq_file.read_text(encoding="utf-8"))["queries"]
    except Exception:
        pass

out_path = save_ranked_results(RANKED_OUT, queries_for_report, use_and_filter=False, k=20)
print(f"\nRanked results saved to: {out_path}")


TF-IDF (log2) top for: 'women full sleeve sweatshirt cotton'
  0.9074 | SWSFZVTTQCB4SJ7F | Full Sleeve Solid Women Sweatshirt
  0.8760 | SWSFQGS456JAZCQB | Full Sleeve Printed Women Sweatshirt
  0.8724 | SWSFYTYMNTBNARUN | Full Sleeve Solid Women Sweatshirt

TF-IDF (log2) top for: 'men slim jeans blue'
  0.7168 | JEAFSKYHZHSZZC9S | Slim Men Blue Jeans
  0.7147 | JEAFRAQXEKGUPNUN | Slim Men Blue Jeans
  0.7096 | JEAFQF6JBUSEXHVF | Slim Men Blue Jeans

Ranked results saved to: C:\Users\joan\Desktop\FEINA\UPF\Course\Fourth_year\Primer_Trimestre\IR_AND_WA\Labs\irwa-search-engine\data\index\ranked_results.json


# **PART 2: Evaluation**

### **STEP 1 — Implementing Metrics:**

In [13]:
def precision_at_k(rel_ranked: List[int], k: int) -> float:
    if k <= 0:
        return 0.0
    return sum(rel_ranked[:k]) / float(k)

def recall_at_k(rel_ranked: List[int], k: int, total_relevant: int) -> float:
    if total_relevant <= 0:
        return 0.0
    return sum(rel_ranked[:k]) / float(total_relevant)

def average_precision_at_k(rel_ranked: List[int], k: int, total_relevant: int) -> float:
    """AP@K = mean of precision@i at each relevant rank i, divided by total relevant in corpus."""
    if total_relevant <= 0:
        return 0.0
    upto = min(k, len(rel_ranked))
    ap_sum = 0.0
    for i in range(1, upto + 1):
        if rel_ranked[i - 1] == 1:
            ap_sum += precision_at_k(rel_ranked, i)
    return ap_sum / float(total_relevant)

def f1_at_k(rel_ranked: List[int], k: int, total_relevant: int) -> float:
    p = precision_at_k(rel_ranked, k)
    r = recall_at_k(rel_ranked, k, total_relevant)
    if p + r == 0:
        return 0.0
    return 2.0 * p * r / (p + r)

def mean_average_precision(all_ap: List[float]) -> float:
    return sum(all_ap) / len(all_ap) if all_ap else 0.0

def mean_reciprocal_rank(all_rel_ranked: List[List[int]]) -> float:
    rr_sum = 0.0
    for rels in all_rel_ranked:
        rr = 0.0
        for i, r in enumerate(rels, start=1):
            if r == 1:
                rr = 1.0 / i
                break
        rr_sum += rr
    return rr_sum / len(all_rel_ranked) if all_rel_ranked else 0.0

def dcg_at_k(rel_ranked: List[int], k: int) -> float:
    if k <= 0:
        return 0.0
    k = min(k, len(rel_ranked))
    dcg = 0.0
    for i in range(1, k + 1):
        gain = (2 ** rel_ranked[i - 1]) - 1  # supports graded relevance if ever used
        dcg += gain / math.log2(i + 1)
    return dcg

def ndcg_at_k(rel_ranked: List[int], k: int) -> float:
    k = min(k, len(rel_ranked))
    dcg = dcg_at_k(rel_ranked, k)
    ideal = sorted(rel_ranked[:k], reverse=True)
    idcg = dcg_at_k(ideal, k)
    return (dcg / idcg) if idcg > 0 else 0.0


### **STEP 2 — Applying the Evaluation Metrics:**

Setup (paths, helpers, constants)

In [14]:
from __future__ import annotations
from typing import List, Dict, Any, Tuple, Iterable
from pathlib import Path
import csv, json, math, os

# If DATA_DIR is already defined earlier, reuse it; else default to CWD
try:
    DATA_DIR
except NameError:
    DATA_DIR = Path(".").resolve()

VAL_PATH = DATA_DIR / "validation_labels.csv"   # Ground truth file provided by the course
K_DEFAULT = 20                                  # Primary cutoff required by the statement
K_LIST = (10, 20)                               # We'll compute both @10 and @20

PROVIDED_QUERIES = {
    "1": "women full sleeve sweatshirt cotton",
    "2": "men slim jeans blue",
}

def dedup_preserve_order(seq: List[str]) -> List[str]:
    """Remove duplicates from a list while keeping order."""
    seen, out = set(), []
    for x in seq:
        if x and x not in seen:
            seen.add(x); out.append(x)
    return out

def round3_dict(d: Dict[str, float]) -> Dict[str, float]:
    """Round all float values in a dict to 3 decimals (leave non-floats untouched)."""
    return {k: (round(v, 3) if isinstance(v, float) else v) for k, v in d.items()}


Ground-truth loader (robust to different header names)

In [None]:
def load_ground_truth_any(path: Path) -> Dict[str, Dict[str, int]]:
    """
    Accepts CSV with either:
      - ('query_id' OR 'query'), 'pid', ('labels' OR 'label')
    Returns: gt[key][pid] = 0/1, where key is query_id or full query text.
    """
    if not path.exists():
        raise FileNotFoundError(f"Ground-truth file not found: {path}")
    with path.open("r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        cols = {c.lower(): c for c in (reader.fieldnames or [])}
        qcol = cols.get("query_id") or cols.get("query")
        pidcol = cols.get("pid")
        lcol = cols.get("labels") or cols.get("label")
        if not (qcol and pidcol and lcol):
            raise ValueError("CSV must contain 'query_id' or 'query', 'pid', and 'labels'/'label'.")
        gt: Dict[str, Dict[str, int]] = {}
        for row in reader:
            qkey = str(row[qcol]).strip()
            pid = str(row[pidcol]).strip()
            lab = 1 if int(row[lcol]) == 1 else 0
            gt.setdefault(qkey, {})[pid] = lab
    return gt


Adapter to get ranked PIDs (live search or saved JSON)

In [None]:
USE_SAVED_RESULTS = False              # Set True if you want to load from JSON instead of running search
RANKED_RESULTS_JSON = DATA_DIR / "index" / "ranked_results.json"  # expected format documented below
TOP_K_RETRIEVE = 100                   # retrieve more than K_DEFAULT, metrics cut later

def extract_pids_from_hits(hits: Iterable[Any]) -> List[str]:
    """
    Extract PIDs from your system's hit objects.
    Supports dict hits (with 'pid'/'PID'/... keys) or tuple/list forms containing a string pid.
    """
    out: List[str] = []
    for h in hits:
        pid = None
        if isinstance(h, dict):
            pid = h.get("pid") or h.get("PID") or h.get("doc_pid") or h.get("docId")
        elif isinstance(h, (list, tuple)):
            pid = next((x for x in h if isinstance(x, str)), None)
        if pid:
            out.append(pid)
    return dedup_preserve_order(out)

def get_ranked_pids_for(qtext: str, k: int = TOP_K_RETRIEVE) -> List[str]:
    """
    Option A: call your search function directly (preferred).
    Option B: load from saved JSON with structure:
      {
        "women full sleeve sweatshirt cotton": [{"pid": "...", "score": ...}, ...],
        "men slim jeans blue": [{"pid": "...", "score": ...}, ...]
      }
    """
    if not USE_SAVED_RESULTS:
        # Call your system (adapt if you use and_filter=True)
        hits = search_tfidf(qtext, k=k)  # <-- change to search_tfidf(qtext, k=k, and_filter=True) if you used AND first
        return extract_pids_from_hits(hits)

    # Load from saved JSON
    if not RANKED_RESULTS_JSON.exists():
        raise FileNotFoundError(f"Saved rankings not found: {RANKED_RESULTS_JSON}")
    with RANKED_RESULTS_JSON.open("r", encoding="utf-8") as fh:
        saved = json.load(fh)
    arr = saved.get(qtext, [])
    if isinstance(arr, list) and arr and isinstance(arr[0], dict) and "pid" in arr[0]:
        return dedup_preserve_order([x["pid"] for x in arr])
    # If your saved format differs, adapt here
    return dedup_preserve_order(extract_pids_from_hits(arr))


Per-query & overall evaluation runners

In [None]:
def build_rel_vector(ranked_pids: List[str], gt_for_q: Dict[str, int]) -> Tuple[List[int], int]:
    rel_ranked = [int(gt_for_q.get(pid, 0)) for pid in ranked_pids]
    total_relevant = sum(1 for v in gt_for_q.values() if v == 1)
    return rel_ranked, total_relevant

def evaluate_one_query_at_ks(
    ranked_pids: List[str], gt_for_q: Dict[str, int], k_list: Iterable[int]
) -> Dict[str, float]:
    ranked_pids = dedup_preserve_order(ranked_pids)
    rel_ranked, total_relevant = build_rel_vector(ranked_pids, gt_for_q)

    out: Dict[str, float] = {}
    for k in k_list:
        p = precision_at_k(rel_ranked, k)
        r = recall_at_k(rel_ranked, k, total_relevant)
        ap = average_precision_at_k(rel_ranked, k, total_relevant)
        f1 = f1_at_k(rel_ranked, k, total_relevant)
        nd = ndcg_at_k(rel_ranked, k)
        out[f"P@{k}"] = p
        out[f"R@{k}"] = r
        out[f"AP@{k}"] = ap
        out[f"F1@{k}"] = f1
        out[f"nDCG@{k}"] = nd
    # RR for the whole list (not tied to K)
    rr = 0.0
    for i, r in enumerate(rel_ranked, start=1):
        if r == 1:
            rr = 1.0 / i
            break
    out["MRR"] = rr
    return out

def evaluate_across_queries_numeric_only(
    system_rankings: Dict[str, List[str]],
    ground_truth: Dict[str, Dict[str, int]],
    k_list: Iterable[int] = K_LIST,
) -> Tuple[Dict[str, Dict[str, float]], Dict[str, float]]:
    per_query: Dict[str, Dict[str, float]] = {}
    # For aggregates (use the largest K in k_list for MAP and mean nDCG)
    last_k = max(k_list)
    ap_list, rel_lists, ndcgs_lastk = [], [], []

    for qkey, ranked in system_rankings.items():
        gt_for_q = ground_truth.get(qkey, {})
        metrics = evaluate_one_query_at_ks(ranked, gt_for_q, k_list)
        per_query[qkey] = round3_dict(metrics)

        rel_ranked, total_rel = build_rel_vector(dedup_preserve_order(ranked), gt_for_q)
        ap_list.append(average_precision_at_k(rel_ranked, last_k, total_rel))
        rel_lists.append(rel_ranked)
        ndcgs_lastk.append(ndcg_at_k(rel_ranked, last_k))

    summary = {
        "K": float(last_k),
        "MAP": mean_average_precision(ap_list),
        "MRR": mean_reciprocal_rank(rel_lists),
        f"mean nDCG@{last_k}": (sum(ndcgs_lastk) / len(ndcgs_lastk) if ndcgs_lastk else 0.0),
    }
    summary = round3_dict(summary)
    return per_query, summary


Run evaluation for the two predefined queries (numbers-only, 3 decimals)

In [18]:
# === EVAL · Cell 6: official queries · numbers-only (3 decimals) ===
# 1) Load ground truth
gt = load_ground_truth_any(VAL_PATH)

# 2) Build system rankings for the two official queries (keys must match GT keys)
#    We use query IDs "1" and "2" to match typical CSVs ('query_id' column)
system_rankings: Dict[str, List[str]] = {}
for qid, qtext in PROVIDED_QUERIES.items():
    system_rankings[qid] = get_ranked_pids_for(qtext, k=TOP_K_RETRIEVE)

# 3) Evaluate and print strictly numeric results
per_query, summary = evaluate_across_queries_numeric_only(system_rankings, gt, k_list=K_LIST)

# NUMBERS ONLY (rounded to 3 decimals) — paste directly into the report section required by the statement
print(json.dumps({
    "query_1": per_query.get("1", {}),
    "query_2": per_query.get("2", {}),
    "summary": summary
}, indent=2))


{
  "query_1": {
    "P@10": 0.0,
    "R@10": 0.0,
    "AP@10": 0.0,
    "F1@10": 0.0,
    "nDCG@10": 0.0,
    "P@20": 0.0,
    "R@20": 0.0,
    "AP@20": 0.0,
    "F1@20": 0.0,
    "nDCG@20": 0.0,
    "MRR": 0.0
  },
  "query_2": {
    "P@10": 0.1,
    "R@10": 0.1,
    "AP@10": 0.02,
    "F1@10": 0.1,
    "nDCG@10": 0.387,
    "P@20": 0.05,
    "R@20": 0.1,
    "AP@20": 0.02,
    "F1@20": 0.067,
    "nDCG@20": 0.387,
    "MRR": 0.2
  },
  "summary": {
    "K": 20.0,
    "MAP": 0.01,
    "MRR": 0.1,
    "mean nDCG@20": 0.193
  }
}


For the predefined queries “women full sleeve sweatshirt cotton” and “men slim jeans blue,” retrieval performance was very low.
The first query returned no relevant results (all metrics = 0), indicating poor vocabulary coverage and missing term alignment in the index.
The second query achieved limited accuracy (P@10 = 0.1, MAP = 0.01, nDCG@20 = 0.387), showing that TF-IDF struggles with multi-term attributes such as “slim” and “blue.”
Overall (MAP = 0.01, MRR = 0.10, mean nDCG@20 = 0.193), the baseline model retrieves few relevant items, confirming its weakness on rare and attribute-rich queries.

### **STEP 3 — You will act as expert judges by establishing the ground truth for each document and query.**
Load the 5 queries and fetch candidates

In [28]:
# === CUSTOM EVAL · Robust loader for proposed_test_queries.json ===
from pathlib import Path
import json

PROPOSED_Q_PATH = DATA_DIR / "index" / "proposed_test_queries.json"

# default fallback (only used if file missing/empty)
proposed_queries = [
    "men shirt solid blue",
    "men high waist shirt blue",
    "women cotton kurta straight",
    "men slim fit formal shirt",
    "men running shoes black",
]

if PROPOSED_Q_PATH.exists():
    try:
        with PROPOSED_Q_PATH.open("r", encoding="utf-8") as fh:
            payload = json.load(fh)

        # accept either: {"queries": [...]}  OR  ["q1","q2",...]
        if isinstance(payload, dict) and "queries" in payload and isinstance(payload["queries"], list):
            loaded = [q for q in payload["queries"] if isinstance(q, str)]
        elif isinstance(payload, list):
            loaded = [q for q in payload if isinstance(q, str)]
        else:
            loaded = []

        if loaded:
            proposed_queries = loaded[:5]

    except Exception as e:
        print(f"Warning reading {PROPOSED_Q_PATH}: {e}\nUsing fallback queries.")

# fetch candidates (same as before)
TOP_N = 50
custom_rankings = {}
for q in proposed_queries:
    hits = search_tfidf(q, k=TOP_N)  # or search_tfidf(q, k=TOP_N, and_filter=True)
    pids = []
    for h in hits:
        if isinstance(h, dict):
            pid = h.get("pid") or h.get("PID") or h.get("doc_pid") or h.get("docId")
        elif isinstance(h, (list, tuple)):
            pid = next((x for x in h if isinstance(x, str)), None)
        else:
            pid = None
        if pid and pid not in pids:
            pids.append(pid)
    custom_rankings[q] = pids

print("Loaded queries from proposed_test_queries.json:")
for i, q in enumerate(proposed_queries, 1):
    print(f"{i}. {q}  (candidates: {len(custom_rankings[q])})")


Loaded queries from proposed_test_queries.json:
1. men shirt solid blue  (candidates: 50)
2. men high waist shirt blue  (candidates: 50)
3. women cotton kurta straight  (candidates: 50)
4. men slim fit formal shirt  (candidates: 50)
5. men running shoes black  (candidates: 50)


Quick peek at candidates (so you can decide labels)

In [29]:
# === CUSTOM EVAL · Cell 2: preview top results per query (compact) ===
def preview(q: str, n: int = 10):
    """Print a compact view (pid, title, brand, category) for quick judging."""
    hits = search_tfidf(q, k=n)  # same retrieval config used in Cell 1
    print(f"\n▶ {q}  (showing {n})")
    for h in hits:
        if not isinstance(h, dict):
            continue
        pid = h.get("pid", "")
        title = h.get("title", "")[:90]
        brand = h.get("brand", "")
        cat = h.get("category", "")
        print(f"- {pid} | {brand} | {cat} | {title}")

# Call preview() for any query you want to inspect before labeling:
# preview(proposed_queries[0], n=12)


Provide your ground truth in-code (no CSV)

In [31]:
# === CUSTOM EVAL · Cell 3A: auto-suggest ground truth (no CSV) ===
# Uses simple intent rules to label top-N results for each query in `proposed_queries`.

import re

def _norm(s):
    s = (s or "").lower()
    s = re.sub(r"[_/|]", " ", s)
    s = re.sub(r"[^a-z0-9\s-]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

GENDER = {
    "women": {"women","woman","female","ladies","girls","lady","girl"},
    "men":   {"men","man","male","gents","boys","gent","boy"},
}
PRODUCT = {
    "shirt": {"shirt","shirts"},
    "kurta": {"kurta","kurtas"},
    "shoes": {"shoe","shoes","sneaker","sneakers","running shoe","running shoes"},
}
COLOR = {"blue","black"}
MATERIAL = {"cotton"}
FIT = {"slim","regular","formal"}
SLEEVE_FULL = {"full sleeve","long sleeve","full-sleeve","long-sleeve"}

def _contains_any(text, terms):
    t = _norm(text)
    for w in terms:
        if " " in w:
            if w in t: return True
        else:
            if re.search(rf"\b{re.escape(w)}\b", t): return True
    return False

def _score_query_match(q, hit):
    """Return (label, score, reasons) for a single (query, hit)."""
    qn = _norm(q)
    txt = _norm(" ".join([
        str(hit.get("title","")), str(hit.get("description","")),
        str(hit.get("brand","")), str(hit.get("category","")),
        str(hit.get("sub_category",""))
    ]))

    reasons = []
    score = 0.0

    # gender
    if "women" in qn and not _contains_any(txt, GENDER["women"]):
        return 0, 0.35, ["missing women term"]
    if "men" in qn and not _contains_any(txt, GENDER["men"]):
        return 0, 0.35, ["missing men term"]

    # product
    if "shirt" in qn and not _contains_any(txt, PRODUCT["shirt"]):
        return 0, 0.4, ["missing product: shirt"]
    if "kurta" in qn and not _contains_any(txt, PRODUCT["kurta"]):
        return 0, 0.4, ["missing product: kurta"]
    if "shoes" in qn and not _contains_any(txt, PRODUCT["shoes"]):
        return 0, 0.4, ["missing product: shoes"]

    score += 0.4; reasons.append("gender/product ok")

    # color/material/fit
    if "blue" in qn and _contains_any(txt, {"blue","navy"}):
        score += 0.2; reasons.append("color blue")
    if "black" in qn and _contains_any(txt, {"black"}):
        score += 0.2; reasons.append("color black")
    if "cotton" in qn and _contains_any(txt, MATERIAL):
        score += 0.2; reasons.append("material cotton")
    if "slim" in qn and _contains_any(txt, {"slim"}):
        score += 0.15; reasons.append("fit slim")
    if "formal" in qn and _contains_any(txt, {"formal"}):
        score += 0.1; reasons.append("formal")
    if "full sleeve" in qn and _contains_any(txt, SLEEVE_FULL):
        score += 0.15; reasons.append("full sleeve")

    # token coverage (soft)
    q_tokens = [t for t in qn.split() if t not in {"and","the","a","of"}]
    cov = sum(1 for t in q_tokens if _contains_any(txt, {t})) / max(1,len(q_tokens))
    score += 0.25 * cov
    reasons.append(f"coverage={cov:.2f}")

    score = max(0.0, min(score, 1.0))
    label = 1 if score >= 0.55 else 0
    return label, score, reasons

# Build gt_custom using auto-suggestions
gt_custom = {q: {} for q in proposed_queries}
auto_summary = []

for q in proposed_queries:
    hits = search_tfidf(q, k=TOP_N)  # same TOP_N you used to build `custom_rankings`
    rel_count = 0
    low_conf = 0
    for h in hits:
        if not isinstance(h, dict):  # skip non-dict shapes
            continue
        pid = h.get("pid") or h.get("PID") or h.get("doc_pid") or h.get("docId")
        if not pid:
            continue
        lab, conf, reasons = _score_query_match(q, h)
        if lab == 1:
            gt_custom[q][pid] = 1
            rel_count += 1
            if conf < 0.60:
                low_conf += 1
    auto_summary.append((q, rel_count, low_conf))

print("Auto-suggested ground truth (label=1) per query:")
for q, rel_count, low_conf in auto_summary:
    print(f"- {q}: {rel_count} relevant (low-confidence: {low_conf})")

print("\nYou can edit gt_custom[q] afterwards to add/remove PIDs if needed.")


Auto-suggested ground truth (label=1) per query:
- men shirt solid blue: 39 relevant (low-confidence: 2)
- men high waist shirt blue: 10 relevant (low-confidence: 7)
- women cotton kurta straight: 0 relevant (low-confidence: 0)
- men slim fit formal shirt: 23 relevant (low-confidence: 0)
- men running shoes black: 30 relevant (low-confidence: 15)

You can edit gt_custom[q] afterwards to add/remove PIDs if needed.


Evaluate your 5 queries (numbers-only, @10 and @20)

In [32]:
# === CUSTOM EVAL · Cell 4: run metrics (numbers-only) ===
# We reuse the earlier helpers: evaluate_across_queries_numeric_only, etc.

# Build system_rankings keyed by the SAME query strings used in gt_custom
system_rankings_custom = {q: custom_rankings.get(q, []) for q in proposed_queries}

# Fill zeros implicitly inside the evaluator by mapping ranked PIDs to 0 if not present in gt
# We'll adapt evaluate_across_queries_numeric_only to accept gt dicts where only relevant=1 are listed.

def _build_full_gt(gt_partial: dict[str, dict[str, int]], system_rankings: dict[str, list[str]]):
    """Ensure gt has 0 for any ranked pid not explicitly marked 1."""
    full = {}
    for q, ranked in system_rankings.items():
        base = gt_partial.get(q, {}).copy()
        for pid in ranked:
            base.setdefault(pid, 0)
        full[q] = base
    return full

gt_custom_full = _build_full_gt(gt_custom, system_rankings_custom)

per_query_c, summary_c = evaluate_across_queries_numeric_only(
    system_rankings_custom, gt_custom_full, k_list=(10, 20)
)

# NUMBERS ONLY
import json
print(json.dumps({
    "custom_per_query": per_query_c,
    "custom_summary": summary_c
}, indent=2))


{
  "custom_per_query": {
    "men shirt solid blue": {
      "P@10": 0.7,
      "R@10": 0.179,
      "AP@10": 0.174,
      "F1@10": 0.286,
      "nDCG@10": 0.991,
      "P@20": 0.85,
      "R@20": 0.436,
      "AP@20": 0.379,
      "F1@20": 0.576,
      "nDCG@20": 0.962,
      "MRR": 1.0
    },
    "men high waist shirt blue": {
      "P@10": 0.2,
      "R@10": 0.2,
      "AP@10": 0.039,
      "F1@10": 0.2,
      "nDCG@10": 0.398,
      "P@20": 0.2,
      "R@20": 0.4,
      "AP@20": 0.081,
      "F1@20": 0.267,
      "nDCG@20": 0.442,
      "MRR": 0.143
    },
    "women cotton kurta straight": {
      "P@10": 0.0,
      "R@10": 0.0,
      "AP@10": 0.0,
      "F1@10": 0.0,
      "nDCG@10": 0.0,
      "P@20": 0.0,
      "R@20": 0.0,
      "AP@20": 0.0,
      "F1@20": 0.0,
      "nDCG@20": 0.0,
      "MRR": 0.0
    },
    "men slim fit formal shirt": {
      "P@10": 0.8,
      "R@10": 0.348,
      "AP@10": 0.338,
      "F1@10": 0.485,
      "nDCG@10": 0.992,
      "P@20": 0.65,
      "R

The metrics provide complementary views of system performance.
Precision@K shows high accuracy in the top results (≈0.7–0.85), while Recall@K remains moderate, meaning some relevant items appear beyond the top 20.
Average Precision and nDCG confirm that relevant products are generally ranked near the top, especially for queries like “men slim fit formal shirt” or “men running shoes black”.
F1@K indicates balanced performance for common queries but poor results for rare ones such as “women cotton kurta straight”.
Overall, the system retrieves and ranks frequent clothing types well, with a MAP of 0.289 and MRR of 0.629 showing reliable ranking for typical queries but weak coverage for uncommon items.

The TF-IDF model depends on exact word matching, so it struggles with rare or varied terms and underweighted fields. Performance drops when vocabulary differs or key attributes appear only in metadata.
To improve results, the system should adopt BM25 for better weighting, apply lemmatization or query expansion to match similar terms, and re-balance field weights to include descriptions and metadata. Incorporating semantic retrieval (e.g., SBERT embeddings) would further enhance ranking quality and recall.