In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
import os

# List contents of cran.all.1400 folder
print("Contents of /content/drive/MyDrive/cran.all.1400/")
print(os.listdir('/content/drive/MyDrive/cran.all.1400'))

# List contents of cranqrel folder
print("\nContents of /content/drive/MyDrive/cranqrel/")
print(os.listdir('/content/drive/MyDrive/cranqrel'))

Contents of /content/drive/MyDrive/cran.all.1400/
['1016.txt', '1002.txt', '1040.txt', '1027.txt', '10.txt', '104.txt', '1006.txt', '1004.txt', '1042.txt', '103.txt', '1034.txt', '1022.txt', '1001.txt', '1.txt', '1032.txt', '1026.txt', '1030.txt', '1010.txt', '100.txt', '1037.txt', '1028.txt', '1019.txt', '1014.txt', '1007.txt', '1041.txt', '1035.txt', '1020.txt', '1045.txt', '1029.txt', '1000.txt', '1036.txt', '1008.txt', '1011.txt', '1044.txt', '1003.txt', '1039.txt', '102.txt', '101.txt', '1038.txt', '1033.txt', '1023.txt', '1009.txt', '1031.txt', '1024.txt', '1046.txt', '1043.txt', '1018.txt', '1012.txt', '1021.txt', '1015.txt', '1017.txt', '1013.txt', '1005.txt', '1025.txt', '1051.txt', '1085.txt', '1060.txt', '1073.txt', '1049.txt', '1062.txt', '106.txt', '1066.txt', '1063.txt', '1047.txt', '1088.txt', '1059.txt', '107.txt', '1064.txt', '1052.txt', '1055.txt', '1061.txt', '1067.txt', '1092.txt', '109.txt', '1071.txt', '1084.txt', '1050.txt', '1070.txt', '1080.txt', '1057.txt', '1

In [16]:
import math
import re
from collections import defaultdict, Counter
from dataclasses import dataclass
from typing import List, Dict, Set, Tuple
import os


@dataclass
class Document:
    doc_id: int
    text: str


@dataclass
class Query:
    q_id: int
    text: str


# ---------- Parsing Cranfield files ----------

def parse_single_cran_doc_file(file_path: str) -> Document:
    """
    Parse a single .txt file from cran.all.1400, where the filename is the doc_id.
    The content of the file is assumed to be the document text.
    """
    doc_id = int(os.path.basename(file_path).split('.')[0])
    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
        text = f.read().strip()
    return Document(doc_id=doc_id, text=text)


def parse_cran_all_from_directory(directory_path: str) -> List[Document]:
    all_docs: List[Document] = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(directory_path, filename)
            doc = parse_single_cran_doc_file(file_path)
            all_docs.append(doc)
    # Sort documents by doc_id to maintain consistency
    all_docs.sort(key=lambda d: d.doc_id)
    return all_docs


def parse_cran_qry(path: str) -> List[Query]:
    """
    Parse cran.qry into a list of Query(q_id, text).
    Expected format: <q_id>\t<query text>
    (Based on observed data in /content/cran.qry.txt)
    """
    queries: List[Query] = []

    # print(f"DEBUG: parse_cran_qry - Attempting to open {path}")
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            # print(f"DEBUG: parse_cran_qry - Processing line {line_num}: '{line}'")
            if not line:
                continue

            # Assuming format: <id>\t<query text>
            parts = line.split('\t', 1)
            if len(parts) == 2:
                try:
                    q_id = int(parts[0])
                    text = parts[1].strip()
                    queries.append(Query(q_id=q_id, text=text))
                    # print(f"DEBUG: parse_cran_qry - Added query q_id: {q_id}, text length: {len(text)}")
                except ValueError:
                    print(f"ERROR: parse_cran_qry - Failed to parse q_id in line {line_num}: '{line}'")
            else:
                print(f"ERROR: parse_cran_qry - Unexpected line format in {path} at line {line_num}: '{line}'")

    # print(f"DEBUG: parse_cran_qry - Finished processing {path}. Queries parsed: {len(queries)}")
    return queries


def parse_cran_qrel(path: str) -> Dict[int, Set[int]]:
    """
    Parse a single cranqrel file (or a file containing qrels for one query)
    into dict: q_id -> set(doc_id).
    Typical format per line: <q_id> <doc_id> <relevance>
    We only need q_id and doc_id.
    """
    qrels: Dict[int, Set[int]] = defaultdict(set)
    # print(f"DEBUG: parse_cran_qrel - Attempting to open {path}")
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            parts = line.split()
            if len(parts) < 2:
                continue
            try:
                qid = int(parts[0])
                docid = int(parts[1])
            except ValueError:
                # print(f"ERROR: parse_cran_qrel - Failed to parse line in {path}: '{line.strip()}'")
                continue
            qrels[qid].add(docid)
    # print(f"DEBUG: parse_cran_qrel - Finished processing {path}. Qrels parsed: {len(qrels)}")
    return qrels

def parse_cran_qrel_from_directory(directory_path: str) -> Dict[int, Set[int]]:
    all_qrels: Dict[int, Set[int]] = defaultdict(set)
    for filename in os.listdir(directory_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(directory_path, filename)
            qrels_from_file = parse_cran_qrel(file_path)
            for qid, doc_ids in qrels_from_file.items():
                all_qrels[qid].update(doc_ids)
    return all_qrels


# ---------- Preprocessing ----------

class Preprocessor:
    def __init__(self, stopwords: Set[str] = None):
        if stopwords is None:
            stopwords = {
                "a", "an", "and", "are", "as", "at", "be", "but", "by",
                "for", "if", "in", "into", "is", "it", "no", "not", "of",
                "on", "or", "such", "that", "the", "their", "then",
                "there", "these", "they", "this", "to", "was", "will",
                "with", "we", "you", "your", "from"
            }
        self.stopwords = stopwords

    def preprocess(self, text: str) -> List[str]:
        # lowercase
        text = text.lower()
        # tokenize: alphabetic words only
        tokens = re.findall(r"[a-z]+", text)
        # remove stopwords
        tokens = [t for t in tokens if t not in self.stopwords]
        return tokens


# ---------- Inverted index + vector space model ----------

class InvertedIndex:
    def __init__(self, preprocessor: Preprocessor):
        self.pre = preprocessor
        self.doc_ids: List[int] = []
        self.vocab: Dict[str, int] = {}
        self.id_to_term: List[str] = []
        self.postings: Dict[int, List[Tuple[int, int]]] = defaultdict(list)
        self.N: int = 0
        self.df: Dict[int, int] = {}
        self.idf: Dict[int, float] = {}
        self.doc_norms: List[float] = []

    def build(self, documents: List[Document]) -> None:
        self.doc_ids = [d.doc_id for d in documents]
        self.N = len(documents)
        term_id_counter = 0

        # build postings
        for doc_index, doc in enumerate(documents):
            tokens = self.pre.preprocess(doc.text)
            tf_counts = Counter(tokens)
            for term, tf in tf_counts.items():
                if term not in self.vocab:
                    self.vocab[term] = term_id_counter
                    self.id_to_term.append(term)
                    term_id_counter += 1
                tid = self.vocab[term]
                self.postings[tid].append((doc_index, tf))

        # document frequency and idf
        self.df = {tid: len(plist) for tid, plist in self.postings.items()}
        self.idf = {
            tid: math.log((self.N + 1) / (df + 0.5))
            for tid, df in self.df.items()
        }

        self.doc_norms = [0.0] * self.N
        for tid, plist in self.postings.items():
            idf = self.idf[tid]
            for doc_index, tf in plist:
                w_td = (1.0 + math.log(tf)) * idf
                self.doc_norms[doc_index] += w_td * w_td
        self.doc_norms = [math.sqrt(n) if n > 0 else 0.0 for n in self.doc_norms]

    def search(self, query_text: str, k: int = 10) -> List[Tuple[int, float]]:
        """
        Return top-k docs as (doc_id, score) for a query.
        """
        tokens = self.pre.preprocess(query_text)
        if not tokens:
            return []

        tf_q = Counter(tokens)

        # compute query weights
        q_weights: Dict[int, float] = {}
        q_norm = 0.0
        for term, tf in tf_q.items():
            if term not in self.vocab:
                continue
            tid = self.vocab[term]
            idf = self.idf.get(tid, 0.0)
            w_tq = (1.0 + math.log(tf)) * idf
            q_weights[tid] = w_tq
            q_norm += w_tq * w_tq

        q_norm = math.sqrt(q_norm) if q_norm > 0 else 0.0
        if q_norm == 0.0:
            return []

        # accumulate scores
        scores: Dict[int, float] = defaultdict(float)
        for tid, w_tq in q_weights.items():
            idf = self.idf[tid]
            for doc_index, tf in self.postings.get(tid, []):
                w_td = (1.0 + math.log(tf)) * idf
                scores[doc_index] += w_tq * w_td

        # convert to cosine similarity and map back to doc_ids
        results: List[Tuple[int, float]] = []
        for doc_index, numerator in scores.items():
            denom = self.doc_norms[doc_index] * q_norm
            if denom > 0:
                score = numerator / denom
                results.append((self.doc_ids[doc_index], score))

        results.sort(key=lambda x: x[1], reverse=True)
        return results[:k]


# ---------- Evaluation metrics ----------

def precision_at_k(ranked_doc_ids: List[int], relevant_set: Set[int], k: int) -> float:
    if k == 0:
        return 0.0
    hits = 0
    for i, doc_id in enumerate(ranked_doc_ids[:k], start=1):
        if doc_id in relevant_set:
            hits += 1
    return hits / k


def average_precision(ranked_doc_ids: List[int], relevant_set: Set[int]) -> float:
    if not relevant_set:
        return 0.0
    hits = 0
    sum_prec = 0.0
    for i, doc_id in enumerate(ranked_doc_ids, start=1):
        if doc_id in relevant_set:
            hits += 1
            sum_prec += hits / i
    return sum_prec / len(relevant_set)


def mean_average_precision(results_by_qid: Dict[int, List[int]],
                           qrels: Dict[int, Set[int]]) -> float:
    aps = []
    for qid, ranked in results_by_qid.items():
        aps.append(average_precision(ranked, qrels.get(qid, set())))
    return sum(aps) / len(aps) if aps else 0.0


# ---------- Main script ----------

if __name__ == "__main__":
    # Adjust paths for folders in MyDrive
    docs_dir = "/content/drive/MyDrive/cran.all.1400"
    qrels_dir = "/content/drive/MyDrive/cranqrel"
    queries_file = "/content/cran.qry.txt" # Assuming this is a single file in /content/

    # print(f"DEBUG: Listing contents of {docs_dir}")
    # print(os.listdir(docs_dir))
    # print(f"DEBUG: Listing contents of {qrels_dir}")
    # print(os.listdir(qrels_dir))

    docs = parse_cran_all_from_directory(docs_dir)
    queries = parse_cran_qry(queries_file)
    qrels = parse_cran_qrel_from_directory(qrels_dir)

    print(f"Loaded {len(docs)} documents, {len(queries)} queries, "
          f"{sum(len(v) for v in qrels.values())} relevance pairs.")

    pre = Preprocessor()
    index = InvertedIndex(pre)
    index.build(docs)
    print(f"Vocabulary size: {len(index.vocab)} terms")

    # Example query demo
    # Only proceed if queries list is not empty
    if queries:
        example_q = queries[0]
        print("\nExample query:")
        print(f"Q{example_q.q_id}: {example_q.text}")
        example_results = index.search(example_q.text, k=10)
        print("Top 10 results (doc_id, score):")
        for doc_id, score in example_results:
            print(f"  {doc_id:4d}  {score:.4f}")
    else:
        print("\nNo queries loaded, skipping example query demo.")


    # Evaluation on all queries with relevance judgments
    results_by_qid: Dict[int, List[int]] = {}
    for q in queries:
        if q.q_id not in qrels:
            # print(f"DEBUG: Query {q.q_id} has no relevance judgments, skipping.")
            continue
        ranked = index.search(q.text, k=100)
        ranked_ids = [doc_id for doc_id, _ in ranked]
        results_by_qid[q.q_id] = ranked_ids

    if results_by_qid:
        Ks = [5, 10]
        prec_at_k = {k: [] for k in Ks}
        for qid, ranked_ids in results_by_qid.items():
            rel = qrels.get(qid, set())
            for k in Ks:
                prec_at_k[k].append(precision_at_k(ranked_ids, rel, k))

        map_score = mean_average_precision(results_by_qid, qrels)

        print("\nEvaluation:")
        for k in Ks:
            avg_p = sum(prec_at_k[k]) / len(prec_at_k[k]) if prec_at_k[k] else 0.0
            print(f"  Mean Precision@{k}: {avg_p:.4f}")
        print(f"  MAP (top-100): {map_score:.4f}")
    else:
        print("\nNo queries with relevance judgments processed for evaluation.")

Loaded 1400 documents, 225 queries, 1837 relevance pairs.
Vocabulary size: 7008 terms

Example query:
Q1: what similarity laws must be obeyed when constructing aeroelastic models of heated high speed aircraft .
Top 10 results (doc_id, score):
    13  0.2168
   184  0.2040
   486  0.1725
    12  0.1485
  1268  0.1255
    51  0.1127
   665  0.1063
   878  0.1062
   875  0.1059
   332  0.0962

Evaluation:
  Mean Precision@5: 0.3991
  Mean Precision@10: 0.2813
  MAP (top-100): 0.3674
