Key Formulas Implemented:

Without Learning:

RSV_d = Σ [log( (N - n_i + 0.5) / (n_i + 0.5) )]

With Learning:

RSV_d = Σ [log( ((r_i + 0.5)/(R - r_i + 0.5)) / ((n_i - r_i + 0.5)/(N - n_i - R + r_i + 0.5)) )]

# Classic BIR – Without and without Learning Data
- Build the classic Binary Independence Retrieval (BIR) model using the smoothed
formula without learning data (i.e. no relevance feedback). For each query, compute the
RSV score for every document and return a ranking (best first).
Hint: Build the binary term–document matrix first

- Build the classic Binary Independence Retrieval (BIR) model using the smoothed
formula with learning data (given in the table below). For each query, compute the RSV
score for every document and return a ranking (best first) and give a short interpretation
of the result

Query | Relevant documents for the query
q1  D2, D4
q2 D2, D4
q3 D4, D1
q4 D2, D1
q5 D3, D6

In [53]:
import os
import numpy as np
import math
from collections import defaultdict
from nltk.stem import PorterStemmer

# 1️⃣ Prétraitement: Tokenisation + Stemming Porter
ps = PorterStemmer()

def tokenize_and_stem(text):
    """
    Tokenisation simple + stemming Porter
    """
    tokens = text.lower().replace(',', ' ').replace('.', ' ').split()
    tokens = [ps.stem(t) for t in tokens if len(t) > 1]
    return tokens

# -------------------------------------------------------------
# 2️⃣ Construction de la matrice terme-document binaire
# -------------------------------------------------------------
class BIRModel:
    def __init__(self):
        self.vocabulary = []
        self.doc_ids = []
        self.W = None  # matrice binaire terme-document
        self.N = 0     # nombre de documents
        self.term_doc_count = None  # ni = nombre de documents contenant le terme
        
    def load_term_doc_file(self, filepath):
        
        term_docs = defaultdict(set)
        all_docs = set()
        all_terms = set()
        
        with open(filepath, 'r', encoding='utf-8') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 2:
                    term = parts[0]
                    doc_id = parts[1]
                    all_terms.add(term)
                    all_docs.add(doc_id)
                    term_docs[term].add(doc_id)
        
        self.vocabulary = sorted(list(all_terms))
        self.doc_ids = sorted(list(all_docs))
        self.N = len(self.doc_ids)
        M = len(self.vocabulary)
        
        self.W = np.zeros((M, self.N), dtype=int)
        self.term_doc_count = np.zeros(M, dtype=int)
        
        for i, term in enumerate(self.vocabulary):
            docs = term_docs[term]
            for j, doc_id in enumerate(self.doc_ids):
                if doc_id in docs:
                    self.W[i,j] = 1
            self.term_doc_count[i] = len(docs)
        
        # print(f"Loaded: {M} terms, {self.N} documents (N={self.N})")
        # print(f"Sparsity: {100*(1-np.count_nonzero(self.W)/(M*self.N)):.2f}%")
    
    # 3️⃣ Calcul RSV sans apprentissage (smoothed)
    def compute_rsv_no_learning(self, query_tokens):
        
        scores = np.zeros(self.N)
        found_terms = []
        for t in query_tokens:
            if t in self.vocabulary:
                i = self.vocabulary.index(t)
                ni = self.term_doc_count[i]
                weight = math.log((self.N - ni + 0.5)/(ni + 0.5))
                scores += weight * self.W[i,:]
                found_terms.append(t)
        return scores, found_terms
    
    # -------------------------------------------------------------
    # 4️⃣ Calcul RSV avec apprentissage (relevance feedback)
    # -------------------------------------------------------------
    def compute_rsv_with_learning(self, query_tokens, relevant_docs):
       
        scores = np.zeros(self.N)
        R = len(relevant_docs)
        rel_indices = [self.doc_ids.index(d) for d in relevant_docs]
        found_terms = []
        
        for t in query_tokens:
            if t in self.vocabulary:
                i = self.vocabulary.index(t)
                ni = self.term_doc_count[i]
                ri = sum(self.W[i, idx] for idx in rel_indices)  # occurrences dans docs pertinents
                # smoothed formula
                weight = math.log( ((ri+0.5)/(R-ri+0.5)) / ((ni-ri+0.5)/(self.N - ni - R + ri + 0.5)) )
                scores += weight * self.W[i,:]
                found_terms.append(t)
        return scores, found_terms
    
    
    # 5️⃣ Classement et affichage
    def rank_query(self, query_text, relevant_docs=None):
        print("*"*70)
        print(f"Query: '{query_text}'")
        
        query_tokens = tokenize_and_stem(query_text)
        # print(f" tokens: {query_tokens}")
        
        # BIR sans apprentissage
        scores_no, found_no = self.compute_rsv_no_learning(query_tokens)
        # print(f" tokens found in vocab: {found_no}")
        ranking_no = [(self.doc_ids[i], scores_no[i]) for i in range(self.N)]
        ranking_no.sort(key=lambda x: x[1], reverse=True)
        print("\n BIR (no learning) ranking:")
        for doc, score in ranking_no:
            print(f"   {doc:<6} score={score:.6f}")
        
        # BIR avec apprentissage
        if relevant_docs:
            scores_learn, found_learn = self.compute_rsv_with_learning(query_tokens, relevant_docs)
            ranking_learn = [(self.doc_ids[i], scores_learn[i]) for i in range(self.N)]
            ranking_learn.sort(key=lambda x: x[1], reverse=True)
            print("\n BIR (with learning) ranking (using relevance set):")
            for doc, score in ranking_learn:
                print(f"   {doc:<6} score={score:.6f}")

# 6️⃣ Main
def main():
    tfidf_file = "results/inverted_index_weighted.txt"
    
    if not os.path.exists(tfidf_file):
        print(f"ERROR: file not found: {tfidf_file}")
        return
    
    bir = BIRModel()
    bir.load_term_doc_file(tfidf_file)
    
    # Requêtes
    queries = [
        "large language models for information retrieval and ranking",
        "LLM for information retrieval and Ranking",
        "query Reformulation in information retrieval",
        "ranking Documents",
        "Optimizing recommendation systems with LLMs by leveraging item metadata"
    ]
    
    # Relevance sets pour l'apprentissage
    relevance_sets = {
        'q1': ['D2', 'D4'],
        'q2': ['D2', 'D4'],
        'q3': ['D4', 'D1'],
        'q4': ['D2', 'D1'],
        'q5': ['D3', 'D6']
    }
    
    for i, qtext in enumerate(queries, 1):
        qid = f'q{i}'
        relevant_docs = relevance_sets.get(qid, None)
        bir.rank_query(qtext, relevant_docs=relevant_docs)
        print("\n")

if __name__ == "__main__":
    main()


**********************************************************************
Query: 'large language models for information retrieval and ranking'

 BIR (no learning) ranking:
   D1     score=-6.519275
   D2     score=-7.107061
   D4     score=-7.694848
   D3     score=-8.282635
   D5     score=-8.282635
   D6     score=-8.282635

 BIR (with learning) ranking (using relevance set):
   D1     score=-0.068764
   D2     score=-0.916062
   D4     score=-1.763360
   D3     score=-2.610658
   D5     score=-2.610658
   D6     score=-2.610658


**********************************************************************
Query: 'LLM for information retrieval and Ranking'

 BIR (no learning) ranking:
   D1     score=1.175573
   D2     score=-0.711496
   D4     score=-1.299283
   D3     score=-1.887070
   D5     score=-1.887070
   D6     score=-1.887070

 BIR (with learning) ranking (using relevance set):
   D1     score=1.694596
   D2     score=1.609438
   D4     score=0.762140
   D3     score=-0.085158
   D

# Extended BIR with and without learning data
Implement the Extended Binary Independence Retrieval (Extended BIR) model, which
generalizes the Classic BIR by incorporating term frequency information rather than
treating documents as purely binary.
You will:
- Use TF–IDF values as document term weights,
- Use Binary query vectors (1 if term appears in query, 0 otherwise),
- Use Same probabilistic weighting formulas given in lecture notes
- Implement 2 models : Extended BIR With and without learning data

Key Formulas Implemented:

Without Learning:

RSV_d = Σ [w_dt × qtfi × log( (N - n_i + 0.5) / (n_i + 0.5) )]

With Learning:

RSV_d = Σ [w_dt × qtfi × log( ((r_i + 0.5)/(R - r_i + 0.5)) / ((n_i - r_i + 0.5)/(N - n_i - R + r_i + 0.5)) )]

In [None]:
import math
from collections import defaultdict

class ExtendedBIR:
    def __init__(self, inverted_index_file):
        self.term_doc_matrix, self.doc_terms, self.doc_lengths, self.term_doc_freq = self.parse_weighted_index(inverted_index_file)
        self.all_docs = list(self.doc_terms.keys())
        self.all_terms = list(self.term_doc_matrix.keys())
        self.N = len(self.all_docs)
        
        # Precompute TF-IDF weights
        self.tfidf_matrix = self.compute_tfidf_matrix()
    
    def parse_weighted_index(self, filename):
        """
        Parse the weighted inverted index with format: term doc_id freq weight
        """
        term_doc_matrix = defaultdict(dict)
        doc_terms = defaultdict(set)
        doc_lengths = defaultdict(int)
        term_doc_freq = defaultdict(int)
        term_weights = defaultdict(dict)
        
        with open(filename, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 4:
                    term = parts[0]
                    doc_id = parts[1]
                    freq = int(parts[2])
                    weight = float(parts[3])
                    
                    term_doc_matrix[term][doc_id] = freq
                    term_weights[term][doc_id] = weight
                    doc_terms[doc_id].add(term)
                    doc_lengths[doc_id] += 1
                    term_doc_freq[term] += 1
        
        self.term_weights = term_weights
        return term_doc_matrix, doc_terms, doc_lengths, term_doc_freq
    
    def compute_tfidf_matrix(self):
        """
        Compute TF-IDF weights for all term-document pairs
        """
        tfidf_matrix = defaultdict(dict)
        
        for term in self.all_terms:
            idf = math.log(self.N / self.term_doc_freq[term]) if self.term_doc_freq[term] > 0 else 0
            
            for doc in self.all_docs:
                if doc in self.term_doc_matrix[term]:
                    # Use the provided weight as TF component, or frequency
                    tf = self.term_weights[term].get(doc, self.term_doc_matrix[term][doc])
                    tfidf = tf * idf
                    tfidf_matrix[doc][term] = tfidf
                else:
                    tfidf_matrix[doc][term] = 0
        
        return tfidf_matrix
    
    def get_document_vector(self, doc_id):
        """Get TF-IDF vector for a document"""
        return self.tfidf_matrix[doc_id]
    
    def get_query_weights(self, query_terms):
        """
        Get query term weights (qtfi)
        For binary queries, qtfi = 1 if term appears in query
        Can be extended for weighted queries
        """
        query_weights = {}
        for term in self.all_terms:
            query_weights[term] = 1 if term in query_terms else 0
        return query_weights

## without learning data 

In [None]:
class ExtendedBIRWithoutLearning(ExtendedBIR):
    def __init__(self, inverted_index_file):
        super().__init__(inverted_index_file)
    
    def compute_rsv(self, query_terms):
        """
        Compute RSV using Extended BIR formula without learning data:
        RSV_d = sum_{t in q} [w_dt * qtfi * log( (N - n_i + 0.5) / (n_i + 0.5) )]
        
        Where:
        - w_dt: TF-IDF weight of term t in document d
        - qtfi: Query term weight (1 for binary queries)
        - N: Total number of documents
        - n_i: Number of documents containing term i
        """
        rsv_scores = {}
        query_weights = self.get_query_weights(query_terms)
        
        for doc in self.all_docs:
            score = 0.0
            doc_vector = self.get_document_vector(doc)
            
            for term in query_terms:
                if term in self.term_doc_matrix:
                    # Get TF-IDF weight for the term in this document
                    w_dt = doc_vector.get(term, 0)
                    
                    # Get query term weight
                    qtfi = query_weights.get(term, 0)
                    
                    # Get document frequency
                    n_i = self.term_doc_freq.get(term, 0)
                    
                    # Compute the log ratio with smoothing
                    numerator = self.N - n_i + 0.5
                    denominator = n_i + 0.5
                    
                    # Avoid division by zero and negative values
                    if denominator <= 0:
                        denominator = 0.5
                    if numerator <= 0:
                        numerator = 0.5
                    
                    log_ratio = math.log(numerator / denominator)
                    
                    # Extended BIR formula
                    score += w_dt * qtfi * log_ratio
            
            rsv_scores[doc] = score
        
        return rsv_scores
    
    def rank_documents(self, query_terms):
        """Rank documents by RSV score"""
        rsv_scores = self.compute_rsv(query_terms)
        ranked_docs = sorted(rsv_scores.items(), key=lambda x: x[1], reverse=True)
        return ranked_docs
    
    def display_scoring_breakdown(self, query_terms, top_k=3):
        """Display detailed scoring breakdown for interpretation"""
        print(f"\nSCORING BREAKDOWN for query: {query_terms}")
        print(f"{'Term':<10} {'n_i':<6} {'N-n_i':<8} {'log_ratio':<12} {'IDF-like':<10}")
        print("-" * 60)
        
        for term in query_terms:
            if term in self.term_doc_matrix:
                n_i = self.term_doc_freq.get(term, 0)
                numerator = self.N - n_i + 0.5
                denominator = n_i + 0.5
                log_ratio = math.log(numerator / denominator)
                idf = math.log(self.N / n_i) if n_i > 0 else 0
                
                print(f"{term:<10} {n_i:<6} {self.N-n_i:<8} {log_ratio:<12.4f} {idf:<10.4f}")
        
        # Show top documents and their term contributions
        ranked_results = self.rank_documents(query_terms)[:top_k]
        print(f"\nTOP {top_k} DOCUMENTS - TERM CONTRIBUTIONS:")
        print(f"{'Doc':<6} {'Total RSV':<12} {'Term Contributions':<40}")
        print("-" * 70)
        
        for doc, total_score in ranked_results:
            doc_vector = self.get_document_vector(doc)
            contributions = []
            for term in query_terms:
                if term in self.term_doc_matrix:
                    w_dt = doc_vector.get(term, 0)
                    qtfi = 1
                    n_i = self.term_doc_freq.get(term, 0)
                    log_ratio = math.log((self.N - n_i + 0.5) / (n_i + 0.5))
                    term_contrib = w_dt * qtfi * log_ratio
                    contributions.append(f"{term}:{term_contrib:.3f}")
            
            print(f"{doc:<6} {total_score:<12.4f} {', '.join(contributions):<40}")

## With learning data

In [None]:
class ExtendedBIRWithLearning(ExtendedBIR):
    def __init__(self, inverted_index_file, relevance_data):
        super().__init__(inverted_index_file)
        self.relevance_data = relevance_data
    
    def compute_rsv(self, query_id, query_terms):
        """
        Compute RSV using Extended BIR formula with learning data:
        RSV_d = sum_{t in q} [w_dt * qtfi * log( ((r_i + 0.5)/(R - r_i + 0.5)) / ((n_i - r_i + 0.5)/(N - n_i - R + r_i + 0.5)) )]
        
        Where:
        - w_dt: TF-IDF weight of term t in document d
        - qtfi: Query term weight (1 for binary queries)
        - r_i: Number of relevant documents containing term i
        - R: Total number of relevant documents for this query
        - n_i: Number of documents containing term i
        - N: Total number of documents
        """
        if query_id not in self.relevance_data:
            # Fallback to non-learning version if no relevance data
            bir_without = ExtendedBIRWithoutLearning("results/inverted_index_weighted.txt")
            return bir_without.compute_rsv(query_terms)
        
        rel_docs = self.relevance_data[query_id]['relevant']
        R = len(rel_docs)
        
        rsv_scores = {}
        query_weights = self.get_query_weights(query_terms)
        
        # Precompute r_i for each term
        r_i = {}
        for term in query_terms:
            if term in self.term_doc_matrix:
                r_i[term] = sum(1 for doc in rel_docs if doc in self.term_doc_matrix[term])
        
        for doc in self.all_docs:
            score = 0.0
            doc_vector = self.get_document_vector(doc)
            
            for term in query_terms:
                if term in self.term_doc_matrix:
                    # Get weights
                    w_dt = doc_vector.get(term, 0)
                    qtfi = query_weights.get(term, 0)
                    
                    # Get statistics
                    n_i = self.term_doc_freq.get(term, 0)
                    r_i_term = r_i.get(term, 0)
                    
                    # Compute the probability ratio with smoothing
                    # Numerator: (r_i + 0.5) / (R - r_i + 0.5)
                    num_numerator = r_i_term + 0.5
                    num_denominator = R - r_i_term + 0.5
                    
                    # Denominator: (n_i - r_i + 0.5) / (N - n_i - R + r_i + 0.5)
                    den_numerator = n_i - r_i_term + 0.5
                    den_denominator = self.N - n_i - R + r_i_term + 0.5
                    
                    # Avoid division by zero and negative values
                    for value in [num_denominator, den_numerator, den_denominator]:
                        if value <= 0:
                            value = 0.5
                    
                    probability_ratio = (num_numerator / num_denominator) / (den_numerator / den_denominator)
                    
                    # Take logarithm
                    log_ratio = math.log(probability_ratio)
                    
                    # Extended BIR formula
                    score += w_dt * qtfi * log_ratio
            
            rsv_scores[doc] = score
        
        return rsv_scores
    
    def rank_documents(self, query_id, query_terms):
        """Rank documents by RSV score"""
        rsv_scores = self.compute_rsv(query_id, query_terms)
        ranked_docs = sorted(rsv_scores.items(), key=lambda x: x[1], reverse=True)
        return ranked_docs
    
    def display_relevance_analysis(self, query_id, query_terms):
        """Display relevance analysis for interpretation"""
        if query_id not in self.relevance_data:
            print(f"No relevance data for {query_id}")
            return
        
        rel_docs = self.relevance_data[query_id]['relevant']
        R = len(rel_docs)
        
        print(f"\nRELEVANCE ANALYSIS for {query_id}:")
        print(f"Relevant documents: {rel_docs} (R={R})")
        print(f"{'Term':<10} {'r_i':<6} {'n_i':<6} {'R-r_i':<8} {'n_i-r_i':<10} {'log_ratio':<12}")
        print("-" * 70)
        
        for term in query_terms:
            if term in self.term_doc_matrix:
                n_i = self.term_doc_freq.get(term, 0)
                r_i = sum(1 for doc in rel_docs if doc in self.term_doc_matrix[term])
                
                # Compute the probability ratio
                num_numerator = r_i + 0.5
                num_denominator = R - r_i + 0.5
                den_numerator = n_i - r_i + 0.5
                den_denominator = self.N - n_i - R + r_i + 0.5
                
                probability_ratio = (num_numerator / num_denominator) / (den_numerator / den_denominator)
                log_ratio = math.log(probability_ratio)
                
                print(f"{term:<10} {r_i:<6} {n_i:<6} {R-r_i:<8} {n_i-r_i:<10} {log_ratio:<12.4f}")
        
        # Show how relevance feedback affects scoring
        print(f"\nRELEVANCE FEEDBACK IMPACT:")
        for term in query_terms:
            if term in self.term_doc_matrix:
                n_i = self.term_doc_freq.get(term, 0)
                r_i = sum(1 for doc in rel_docs if doc in self.term_doc_matrix[term])
                
                # Without learning score component
                without_learning = math.log((self.N - n_i + 0.5) / (n_i + 0.5))
                
                # With learning score component
                with_learning = math.log(((r_i + 0.5)/(R - r_i + 0.5)) / ((n_i - r_i + 0.5)/(self.N - n_i - R + r_i + 0.5)))
                
                impact = with_learning - without_learning
                direction = "↑" if impact > 0 else "↓" if impact < 0 else "→"
                
                print(f"{term}: {without_learning:.4f} → {with_learning:.4f} ({direction} {impact:+.4f})")

## Implementation and comparison

In [None]:
def prepare_relevance_data():
    """Prepare relevance data for learning"""
    return {
        'q1': {'relevant': {'D2', 'D4'}, 'non_relevant': {'D1', 'D3', 'D5', 'D6'}},
        'q2': {'relevant': {'D2', 'D4'}, 'non_relevant': {'D1', 'D3', 'D5', 'D6'}},
        'q3': {'relevant': {'D4', 'D1'}, 'non_relevant': {'D2', 'D3', 'D5', 'D6'}},
        'q4': {'relevant': {'D2', 'D1'}, 'non_relevant': {'D3', 'D4', 'D5', 'D6'}},
        'q5': {'relevant': {'D3', 'D6'}, 'non_relevant': {'D1', 'D2', 'D4', 'D5'}}
    }

def main():
    # Prepare data
    relevance_data = prepare_relevance_data()
    
    # Initialize both models
    print("=== EXTENDED BIR MODEL - CORRECTED FORMULAS ===")
    print("With vs Without Learning Data\n")
    
    bir_without_learning = ExtendedBIRWithoutLearning("results/inverted_index_weighted.txt")
    bir_with_learning = ExtendedBIRWithLearning("results/inverted_index_weighted.txt", relevance_data)
    
    # Test queries
    test_queries = {
        'q1': ['10%', '175'],
        'q2': ['12%', '175'], 
        'q3': ['12%', 'D6'],
        'q4': ['10%', 'D6'],
        'q5': ['D6', '1']
    }
    
    for query_id, query_terms in test_queries.items():
        print(f"\n{'='*100}")
        print(f"QUERY: {query_id} - Terms: {query_terms}")
        print(f"Relevant docs: {relevance_data[query_id]['relevant']}")
        print(f"{'='*100}")
        
        # Get rankings from both models
        ranked_without = bir_without_learning.rank_documents(query_terms)
        ranked_with = bir_with_learning.rank_documents(query_id, query_terms)
        
        # Display detailed analysis
        bir_without_learning.display_scoring_breakdown(query_terms)
        bir_with_learning.display_relevance_analysis(query_id, query_terms)
        
        # Display comparison
        print(f"\nRANKING COMPARISON:")
        print(f"{'Rank':<6} {'Without Learning':<20} {'RSV':<12} {'With Learning':<20} {'RSV':<12} {'Relevance':<15}")
        print("-" * 100)
        
        for i in range(min(len(ranked_without), 6)):
            doc_without, score_without = ranked_without[i]
            doc_with, score_with = ranked_with[i] if i < len(ranked_with) else ("-", 0)
            
            rel_without = "RELEVANT" if doc_without in relevance_data[query_id]['relevant'] else "non-rel"
            rel_with = "RELEVANT" if doc_with in relevance_data[query_id]['relevant'] else "non-rel"
            
            print(f"{i+1:<6} {doc_without:<20} {score_without:<12.4f} {doc_with:<20} {score_with:<12.4f} {rel_without}/{rel_with}")
        
        # Performance metrics
        print(f"\nPERFORMANCE SUMMARY for {query_id}:")
        calculate_performance_metrics(query_id, ranked_without, ranked_with, relevance_data[query_id]['relevant'])

def calculate_performance_metrics(query_id, ranked_without, ranked_with, relevant_docs):
    """Calculate and display performance metrics"""
    
    def precision_at_k(ranked_list, relevant_docs, k=3):
        return sum(1 for doc, _ in ranked_list[:k] if doc in relevant_docs) / k
    
    def average_precision(ranked_list, relevant_docs):
        score = 0.0
        num_relevant = 0
        for i, (doc, _) in enumerate(ranked_list):
            if doc in relevant_docs:
                num_relevant += 1
                score += num_relevant / (i + 1)
        return score / len(relevant_docs) if relevant_docs else 0
    
    p3_without = precision_at_k(ranked_without, relevant_docs, 3)
    p3_with = precision_at_k(ranked_with, relevant_docs, 3)
    ap_without = average_precision(ranked_without, relevant_docs)
    ap_with = average_precision(ranked_with, relevant_docs)
    
    print(f"Precision@3:   {p3_without:.4f} (without) → {p3_with:.4f} (with) → {p3_with - p3_without:+.4f} change")
    print(f"Avg Precision: {ap_without:.4f} (without) → {ap_with:.4f} (with) → {ap_with - ap_without:+.4f} change")
    
    # Check if learning improved the ranking
    rel_pos_without = [i for i, (doc, _) in enumerate(ranked_without) if doc in relevant_docs]
    rel_pos_with = [i for i, (doc, _) in enumerate(ranked_with) if doc in relevant_docs]
    
    if rel_pos_with and rel_pos_without:
        print(f"Best relevant doc position: {min(rel_pos_without)+1} → {min(rel_pos_with)+1}")

if __name__ == "__main__":
    main()