# Information Retrieval Lab – Vector Space Model (lnc.ltc)

This notebook implements a complete Vector Space Model pipeline, including:
- Corpus loading from a provided ZIP file
- Tokenization, normalization (lowercasing, punctuation removal), stemming (NLTK Porter), and optional lemmatization
- Inverted index with dictionary and postings (df, [(docID, tf), ...])
- Document vector weighting lnc (log tf + cosine norm, no idf)
- Query vector weighting ltc (log tf + idf + cosine norm)
- Cosine similarity and ranked retrieval (top-10)
- Soundex-based spelling fallback for out-of-vocabulary query terms
- Test queries execution with readable outputs

Follow the cells in order; each section includes explanations and progress prints.

## 1) Imports

In [2]:
%pip install nltk

Collecting nltk
  Downloading nltk-3.9.1-py3-none-any.whl.metadata (2.9 kB)
Collecting click (from nltk)
  Downloading click-8.3.0-py3-none-any.whl.metadata (2.6 kB)
Downloading nltk-3.9.1-py3-none-any.whl (1.5 MB)
   ---------------------------------------- 0.0/1.5 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.5 MB ? eta -:--:--
    --------------------------------------- 0.0/1.5 MB 435.7 kB/s eta 0:00:04
   ------ --------------------------------- 0.2/1.5 MB 2.0 MB/s eta 0:00:01
   ---------------- ----------------------- 0.6/1.5 MB 4.4 MB/s eta 0:00:01
   ---------------------------------------  1.5/1.5 MB 8.7 MB/s eta 0:00:01
   ---------------------------------------  1.5/1.5 MB 8.7 MB/s eta 0:00:01
   ---------------------------------------  1.5/1.5 MB 8.7 MB/s eta 0:00:01
   ---------------------------------------  1.5/1.5 MB 8.7 MB/s eta 0:00:01
   ---------------------------------------  1.5/1.5 MB 8.7 MB/s eta 0:00:01
   ----------------------------------

In [3]:
# Import Required Libraries
import os
import re
import math
import json
import zipfile
from pathlib import Path
from collections import defaultdict, Counter
from typing import List, Tuple, Dict

# NLTK for stemming/lemmatization
import nltk
from nltk.stem import PorterStemmer, WordNetLemmatizer

# Ensure needed NLTK resources (quietly)
try:
    nltk.data.find('corpora/wordnet')
except LookupError:
    nltk.download('wordnet', quiet=True)
try:
    nltk.data.find('omw-1.4')
except LookupError:
    nltk.download('omw-1.4', quiet=True)

ps = PorterStemmer()
lemmatizer = WordNetLemmatizer()

# Globals populated later
DOCS: Dict[int, str] = {}              # docID -> raw text
DOCID_TO_PATH: Dict[int, str] = {}     # docID -> relative file path/name
PATH_TO_DOCID: Dict[str, int] = {}     # path -> docID
VOCAB_DF: Dict[str, int] = {}          # term -> document frequency
POSTINGS: Dict[str, List[Tuple[int, int]]] = {}  # term -> list of (docID, raw_tf)
DOC_LN_FACTORS: Dict[int, float] = {}  # docID -> l2 norm for lnc
N_DOCS: int = 0

print("Imports complete. NLTK ready.")

Imports complete. NLTK ready.


## 2) Unzip corpus and load documents

- Unzips the provided `Corpus-*.zip` into a `corpus/` folder next to this notebook.
- Loads all `.txt` files into memory.
- Assigns incremental docIDs (starting at 0) and keeps mappings both ways.

In [25]:
# Locate the ZIP file (assumes a single corpus zip in the workspace root)
workspace_dir = Path.cwd()
zip_candidates = list(workspace_dir.glob('Corpus-*.zip'))
if not zip_candidates:
    # Also try relative path if launched from subfolder
    zip_candidates = list(Path('.').glob('Corpus-*.zip'))

if not zip_candidates:
    raise FileNotFoundError("Corpus ZIP not found. Ensure a file named 'Corpus-*.zip' exists next to this notebook.")

corpus_zip = zip_candidates[0]
corpus_dir = workspace_dir / 'corpus'
corpus_dir.mkdir(exist_ok=True)

print(f"Unzipping: {corpus_zip} -> {corpus_dir}")
with zipfile.ZipFile(corpus_zip, 'r') as zf:
    zf.extractall(corpus_dir)

# Recursively collect .txt files (some zips create a top-level folder)
text_files = sorted([p for p in corpus_dir.rglob('*.txt')])
if not text_files:
    raise RuntimeError("No .txt files found after unzipping. Check the archive structure.")

# Load into memory and create ID mappings
DOCS.clear(); DOCID_TO_PATH.clear(); PATH_TO_DOCID.clear()
doc_id = 0
for path in text_files:
    try:
        text = path.read_text(encoding='utf-8', errors='ignore')
    except Exception:
        text = path.read_text(errors='ignore')  # fallback
    DOCS[doc_id] = text
    rel = str(path.relative_to(corpus_dir))
    DOCID_TO_PATH[doc_id] = rel
    PATH_TO_DOCID[rel] = doc_id
    doc_id += 1

N_DOCS = len(DOCS)
print(f"Loaded {N_DOCS} documents.")
print("Example mapping:")
for i in range(min(3, N_DOCS)):
    print(f"  docID={i} -> {DOCID_TO_PATH[i]}")

Unzipping: e:\COLLEGE MATERIAL\SEMESTER 7\Information Retrieval\Assignment_Vector_Space_Model\Corpus-20230203T210935Z-001.zip -> e:\COLLEGE MATERIAL\SEMESTER 7\Information Retrieval\Assignment_Vector_Space_Model\corpus
Loaded 41 documents.
Example mapping:
  docID=0 -> Corpus\Adobe.txt
  docID=1 -> Corpus\Amazon.txt
  docID=2 -> Corpus\apple.txt


## 3) Preprocessing functions (tokenize, normalize, stem/lemmatize)

- Lowercase text, remove punctuation (keep alphanumerics), and split into tokens.
- Apply stemming (Porter). Optionally lemmatization can be toggled.
- Reuse these for both documents and queries.

In [26]:
# Locate the ZIP file (assumes a single corpus zip in the workspace root)
workspace_dir = Path.cwd()
zip_candidates = list(workspace_dir.glob('Corpus-*.zip'))
if not zip_candidates:
    # Also try relative path if launched from subfolder
    zip_candidates = list(Path('.').glob('Corpus-*.zip'))

if not zip_candidates:
    raise FileNotFoundError("Corpus ZIP not found. Ensure a file named 'Corpus-*.zip' exists next to this notebook.")

corpus_zip = zip_candidates[0]
corpus_dir = workspace_dir / 'corpus'
corpus_dir.mkdir(exist_ok=True)

print(f"Unzipping: {corpus_zip} -> {corpus_dir}")
with zipfile.ZipFile(corpus_zip, 'r') as zf:
    zf.extractall(corpus_dir)

# Recursively collect .txt files (some zips create a top-level folder)
text_files = sorted([p for p in corpus_dir.rglob('*.txt')])
if not text_files:
    raise RuntimeError("No .txt files found after unzipping. Check the archive structure.")

# Load into memory and create ID mappings
DOCS.clear(); DOCID_TO_PATH.clear(); PATH_TO_DOCID.clear()
doc_id = 0
for path in text_files:
    try:
        text = path.read_text(encoding='utf-8', errors='ignore')
    except Exception:
        text = path.read_text(errors='ignore')  # fallback
    DOCS[doc_id] = text
    rel = str(path.relative_to(corpus_dir))
    DOCID_TO_PATH[doc_id] = rel
    PATH_TO_DOCID[rel] = doc_id
    doc_id += 1

N_DOCS = len(DOCS)
print(f"Loaded {N_DOCS} documents.")
print("Example mapping:")
for i in range(min(3, N_DOCS)):
    print(f"  docID={i} -> {DOCID_TO_PATH[i]}")

['run', 'run', 'runner', 's', 'run']


## 4) Build inverted index (dictionary + postings)

- For each document: preprocess, count term frequencies.
- Dictionary maps term -> df and postings list of (docID, raw_tf).
- Progress prints are shown for batches of documents.

In [27]:
def build_index(use_lemma: bool = False):
    global VOCAB_DF, POSTINGS
    VOCAB_DF = {}
    postings_dd = defaultdict(list)

    print("Building inverted index...")
    for i in range(N_DOCS):
        tokens = preprocess(DOCS[i], use_lemma=use_lemma)
        tf = Counter(tokens)
        for term, freq in tf.items():
            postings_dd[term].append((i, freq))
        if (i + 1) % 50 == 0 or i == N_DOCS - 1:
            print(f"  processed {i+1}/{N_DOCS} docs")

    # finalize structures
    POSTINGS.clear()
    for term, plist in postings_dd.items():
        # sort postings by docID to keep order stable
        plist.sort(key=lambda x: x[0])
        POSTINGS[term] = plist
        VOCAB_DF[term] = len(plist)

    print(f"Index built: |V|={len(VOCAB_DF)}, postings terms={len(POSTINGS)}")

# Build index now
build_index(use_lemma=False)

Building inverted index...
  processed 41/41 docs
Index built: |V|=4250, postings terms=4250


## 5) Document vector normalization (lnc)

- For each document term with raw_tf > 0: weight = 1 + log10(raw_tf)
- Compute L2 norm per document and store in `DOC_LN_FACTORS`.

In [28]:
def compute_doc_lengths():
    global DOC_LN_FACTORS
    DOC_LN_FACTORS = {i: 0.0 for i in range(N_DOCS)}

    # accumulate squared weights per doc
    accum = defaultdict(float)
    for term, plist in POSTINGS.items():
        for doc_id, raw_tf in plist:
            if raw_tf > 0:
                w = 1.0 + math.log10(raw_tf)
                accum[doc_id] += w * w

    for doc_id in range(N_DOCS):
        DOC_LN_FACTORS[doc_id] = math.sqrt(accum.get(doc_id, 0.0)) or 1.0  # avoid div by zero

    print("Computed document length normalization factors.")

compute_doc_lengths()

Computed document length normalization factors.


## 6) Query processing + Soundex fallback

- Preprocess queries the same way as documents.
- If a term is not in the dictionary, attempt to find Soundex code matches from existing dictionary terms.

In [29]:
# Soundex implementation (American Soundex)
_soundex_map = {
    'b': '1', 'f': '1', 'p': '1', 'v': '1',
    'c': '2', 'g': '2', 'j': '2', 'k': '2', 'q': '2', 's': '2', 'x': '2', 'z': '2',
    'd': '3', 't': '3',
    'l': '4',
    'm': '5', 'n': '5',
    'r': '6'
}

def soundex(word: str) -> str:
    if not word:
        return "0000"
    word = re.sub(r"[^a-zA-Z]", "", word).lower()
    if not word:
        return "0000"
    first = word[0].upper()
    digits = []
    prev = _soundex_map.get(word[0], '')
    for ch in word[1:]:
        # Vowels and the letters H, W, Y act as separators
        if ch in 'aeiouyhw':
            code = ''
        else:
            code = _soundex_map.get(ch, '')
        if code != '' and code != prev:
            digits.append(code)
        prev = code
    code = first + ''.join(digits)
    code = (code + '000')[:4]
    return code

# Build a code->terms mapping for dictionary words (on demand)
_sdx_to_terms = None

def build_soundex_index():
    global _sdx_to_terms
    _sdx_to_terms = defaultdict(set)
    for term in VOCAB_DF.keys():
        _sdx_to_terms[soundex(term)].add(term)


def preprocess_query(text: str) -> List[str]:
    return preprocess(text, use_lemma=False)


def expand_oov_terms_with_soundex(q_terms: List[str]) -> List[str]:
    # For terms not present, try to find a single best soundex match by highest df (then lexicographic)
    global _sdx_to_terms
    if _sdx_to_terms is None:
        build_soundex_index()

    expanded = []
    replacements = {}
    for t in q_terms:
        if t in VOCAB_DF:
            expanded.append(t)
        else:
            sdx = soundex(t)
            cands = list(_sdx_to_terms.get(sdx, []))
            if cands:
                best = sorted(cands, key=lambda c: (-VOCAB_DF.get(c, 0), c))[0]
                expanded.append(best)
                replacements[t] = best
            else:
                expanded.append(t)  # keep as-is; no matches
    if replacements:
        print("Soundex replacements:", ", ".join(f"{k}->{v}" for k, v in replacements.items()))
    return expanded

print("Soundex utilities ready.")

Soundex utilities ready.


## 7) Similarity computation and ranked retrieval (lnc.ltc)

- Query weights: ltc (log tf + idf with idf = log10(N/df) + cosine normalization)
- Document weights: lnc (log tf + cosine normalization, no idf)
- Cosine similarity using postings lists for efficiency.
- Return top-10 sorted by score desc, then docID asc.

In [30]:
def rank_query(query: str, top_k: int = 10) -> List[Tuple[int, float]]:
    # Preprocess + Soundex expansion for OOV
    q_tokens = preprocess_query(query)
    q_terms = expand_oov_terms_with_soundex(q_tokens)

    # Query term frequencies and ltc weights
    q_tf = Counter(q_terms)
    q_weights = {}
    for term, raw_tf in q_tf.items():
        if raw_tf <= 0:
            continue
        tfw = 1.0 + math.log10(raw_tf)
        df = VOCAB_DF.get(term)
        if not df:
            # for terms not in vocab (e.g., no soundex match resolved), skip from vector
            continue
        idf = math.log10(N_DOCS / df) if df > 0 else 0.0
        q_weights[term] = tfw * idf

    # Normalize query vector
    q_norm = math.sqrt(sum(w * w for w in q_weights.values())) or 1.0
    for t in list(q_weights.keys()):
        q_weights[t] /= q_norm

    # Accumulate cosine scores using postings
    scores = defaultdict(float)
    for term, qw in q_weights.items():
        plist = POSTINGS.get(term, [])
        for doc_id, raw_tf in plist:
            if raw_tf > 0:
                dw_tf = 1.0 + math.log10(raw_tf)  # lnc weight numerator
                denom = DOC_LN_FACTORS.get(doc_id, 1.0)
                if denom == 0.0:
                    denom = 1.0
                dw = dw_tf / denom
                scores[doc_id] += qw * dw

    # Prepare top-k results sorted by score desc, then docID asc
    results = sorted(scores.items(), key=lambda x: (-x[1], x[0]))[:top_k]
    return results

print("Ranking function ready.")

Ranking function ready.


## 8) Run test cases and display outputs

- Use example queries from the provided PDF (paste a couple here).
- Show top-10 results with similarity scores and filenames.
- Tie-breaking: score desc, then docID asc.

In [33]:
def print_results(results: List[Tuple[int, float]], header: str = None):
    if header:
        print(header)
    for rank, (doc_id, score) in enumerate(results, 1):
        print(f"{rank:2d}. docID={doc_id:4d}  score={score:.6f}  file={DOCID_TO_PATH.get(doc_id, '?')}")

# Example test queries 
queries = [
    "Developing your Zomato business account and profile is a great way to boost your restaurant’s online reputation", "Warwickshire, came from an ancient family and was the heiress to some land"
]

for i, q in enumerate(queries, 1):
    print("\n" + "="*80)
    print(f"Q{i}: {q}")
    res = rank_query(q, top_k=10)
    print_results(res, header="Top-10 results:")


Q1: Developing your Zomato business account and profile is a great way to boost your restaurant’s online reputation
Top-10 results:
 1. docID=  40  score=0.199932  file=Corpus\zomato.txt
 2. docID=  33  score=0.117688  file=Corpus\swiggy.txt
 3. docID=  13  score=0.068714  file=Corpus\instagram.txt
 4. docID=  16  score=0.064887  file=Corpus\messenger.txt
 5. docID=  39  score=0.061525  file=Corpus\youtube.txt
 6. docID=   8  score=0.057806  file=Corpus\Discord.txt
 7. docID=   4  score=0.053515  file=Corpus\bing.txt
 8. docID=  25  score=0.050172  file=Corpus\reddit.txt
 9. docID=  29  score=0.045595  file=Corpus\skype.txt
10. docID=  10  score=0.041589  file=Corpus\google.txt

Q2: Warwickshire, came from an ancient family and was the heiress to some land
Top-10 results:
 1. docID=  28  score=0.108578  file=Corpus\shakespeare.txt
 2. docID=  15  score=0.027817  file=Corpus\levis.txt
 3. docID=  10  score=0.022757  file=Corpus\google.txt
 4. docID=   0  score=0.022277  file=Corpus\Ado

## 9) Wrap up and optionally save results

- Save last query results to a CSV file in the workspace for your records.

In [34]:
# Save results of the last executed query loop (if any) to CSV
try:
    import csv
    last_q = queries[-1]
    last_results = rank_query(last_q, top_k=10)
    out_path = Path('vsm_results.csv')
    with out_path.open('w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['rank', 'docID', 'score', 'filename'])
        for rank, (doc_id, score) in enumerate(last_results, 1):
            writer.writerow([rank, doc_id, f"{score:.6f}", DOCID_TO_PATH.get(doc_id, '?')])
    print(f"Saved top-10 results of last query to: {out_path.resolve()}")
except Exception as e:
    print(f"Could not save results: {e}")

Saved top-10 results of last query to: E:\COLLEGE MATERIAL\SEMESTER 7\Information Retrieval\Assignment_Vector_Space_Model\vsm_results.csv
