In [1]:
%load_ext autoreload
%autoreload 2

# BIR retrieval with inverted files

## Helper functions for the BIRModel

### Tokenizer & Set of Words

In [2]:
from utils import analyzer

print(analyzer.set_of_words("this is a simple test for this function", remove_stopwords = True))
print(analyzer.set_of_words("this is a simple test for this function", remove_stopwords = False))

{'test', 'simple', 'function'}
{'for', 'test', 'is', 'simple', 'this', 'function', 'a'}


### Feedback class

In [3]:
from typing import Callable

class Feedback:
    """
        Collects feedback for documents and provides functions to check if 
        document is assessed, relevant or not relevant.
    """
    def __init__(self, assessment_func: Callable[[int], bool] = None):
        self.assessment_func = assessment_func
        self.clear()

    def clear(self):
        self.assessed = set()
        self.relevant = set()
    
    def is_initial_step(self) -> bool:
        return len(self.assessed) == 0

    def assess(self, doc_id: int) -> None:
        self.assessed.add(doc_id)
        if not self.assessment_func or self.assessment_func(doc_id):
            self.relevant.add(doc_id)
        
    def is_relevant(self, doc_id: int) -> bool:
        return doc_id in self.relevant
    
    def is_assessed(self, doc_id: int) -> bool:
        return doc_id in self.assessed
    
    def is_not_relevant(self, doc_id: int) -> bool:
        return (doc_id in self.assessed) and (doc_id not in self.relevant)

### TopKList class

In [4]:
from heapq import heappop, heappush

class TopKList:
    """
        Maintains a list of top-k documents. Initializer accepts
        a list of tuples (term, weight) to provide information about
        weights used by retrieval model. Implements the iter() interface.
        Takes an optional predicate(doc_id: int) function to filter documents
        before returning them. 
    """
    def __init__(self, k: int, term_weights: list[tuple[str,float]] = None, predicate: Callable[[int], bool] = None):
        self.docs_heap = []
        self.k = k
        self.predicate = predicate
        self.results = []
        if term_weights:
            self.term_weights = term_weights
            self.terms = [term for term, _ in self.term_weights]
            self.weights = dict(self.term_weights)
    
    def add(self, doc_id: int, score: float):
        heappush(self.docs_heap, (-score, doc_id, {'id': doc_id, 'score': score}))
        # optional (infrequent) pruning if heap grows too large

    def __iter__(self):
        # do we already have the results?
        for entry in self.results:
            yield entry
        # produce more results (if necessary and available)
        rank = len(self.results)
        while rank < self.k and len(self.docs_heap) > 0:
            entry = heappop(self.docs_heap)[2]
            if self.predicate == None or self.predicate(entry['id']):
                rank += 1
                entry['rank'] = rank
                self.results.append(entry)
                yield entry

## BIR Model Implementation

### The Base Retriever Class

* `n_docs: int`: number of documents added to index
* `documents dict[int, dict{id, vector}]`: collection of documents as dictionary with doc_id as key. Each document is a dictionary with the properties from the dataset and additional properties for the retrieval:
  - `id` hold the document id as generated when loading the document; corresponds to the key in documents
  - `vector` holds the term freqeuncies as dictionary (key=term, value=term frequency)
* `vocabulary: dict[term, int]`: vocabularoy of collection with term as keys and document frequency as values
* `index: dict[term, list[int]]`: inverted index mapping terms to postings. Postings contain doc_id sorted by doc_id

In [5]:
class BIRModel:
    """
        Generic class for the evaluation of the BIR model, inherited by the document-at-a-time (DAAT) and 
        term-at-a-time (TAAT) models. 
    """
    def __init__(self, collection: list[dict] = None, remove_stopwords: bool = True):
        self.remove_stopwords = remove_stopwords
        self.build_index(collection or [])
    
    def add_document(self, doc: dict):
        self.n_docs += 1
        doc_id = doc['id'] = self.n_docs
        self.documents[doc_id] = doc
        # create vector from str-properties
        text = ' '.join([value for key, value in doc.items() if type(value) == str])
        vector = doc['vector'] = analyzer.set_of_words(text, remove_stopwords = self.remove_stopwords)
        # add to vocabulary and postings
        for term in vector:
            self.vocabulary[term] = self.vocabulary.get(term, 0) + 1
            self.index.setdefault(term, []).append(doc_id)
    
    def build_index(self, collection: list[dict]):
        self.n_docs = 0
        self.documents = {}
        self.index = {}
        self.vocabulary = {}
        # load all documents
        for doc in collection:
            self.add_document(doc)

### Calculation of c_j-weights
Two variants for initial step and feedback step.

In [6]:
import math

class BIRModel(BIRModel):
    def cj_weight(self, term: str, feedback: Feedback):
        doc_freq = len(self.index[term])
        if feedback.is_initial_step():
            rj = 0.5
            nj = (doc_freq + 0.5) / (len(self.documents) + 1)
        else:
            # get postings as set to siplify calculations in Python
            docs = set(self.index[term])
            # number of assessed relevant documents which have the term
            lj, L = len(feedback.relevant & docs), len(feedback.relevant)
            # number of assessed documents which have the term
            kj, K = len(feedback.assessed & docs), len(feedback.assessed)
            # calculate rj and nj
            rj = (lj + 0.5) / (L + 1)
            nj = (kj - lj + 0.5) / (K - L + 1)
        return math.log(rj / (1 - rj) * (1 - nj) / nj)

### Term & document filtering with options
Pruning of terms and documents based on the following settings:
- `PRUNE_NEGATIVE_WEIGHTS: bool = False`, set this property to True to remove terms with negative weights
- `PRUNE_WEIGHT_THRESHOLD: bool  = False`, set this property to remove terms with absolute weights smaller than this value
- `PRUNE_TOPK: bool | int = False`, set this property to select top-k weights based on absolute values
- `PRUNE_NON_RELEVANT: bool = False`, set this property to true to prune non-relevant documents from result list

In [7]:
class BIRModel(BIRModel):
    # set this property to True to remove terms with negative weights
    PRUNE_NEGATIVE_WEIGHTS = False

    # set this property to remove terms with absolute weights smaller than this value
    PRUNE_WEIGHT_THRESHOLD  = False

    # set this property to select top-k weights based on absolute values
    PRUNE_TOPK = False

    # set this property to true to prune non-relevant documents from result list
    PRUNE_NON_RELEVANT = False

    def filter_terms(self, terms: set[str], feedback: Feedback) -> list[tuple[str,float]]:
        # remove terms not in vocabulary
        terms = list(filter(lambda t: t in self.vocabulary, terms))
        # calculate weigths and produce tuples (term, weight)
        term_weights = list(map(lambda t: (t, self.cj_weight(t, feedback)), terms))
        # filter terms with negative weights
        if self.PRUNE_NEGATIVE_WEIGHTS:
            term_weights = list(filter(lambda t: t[1] >= 0, term_weights))
        # filter terms with small absolute weights
        if self.PRUNE_WEIGHT_THRESHOLD:
            term_weights = list(filter(lambda t: abs(t[1]) > self.PRUNE_WEIGHT_THRESHOLD, term_weights))
        # select top-k terms based on absolute values
        if self.PRUNE_TOPK:
            term_weights = sorted(term_weights, key = lambda t: (-abs(t[1]),len(self.index[t[0]]),t[0]))[:self.PRUNE_TOPK]
        return term_weights

### Document-at-a-time (DAAT) for BIR Model
The implementation of DAAT for the BIR model uses sorted postings and processes postings in ascending order of the document IDs (see Or-implementation of Boolean model).

In [8]:
class BIRModel_DAAT(BIRModel):
    """
        Implements the DAAT model for the BIR model using inverted index method.
    """
    def search(self, query: str, k: int, feedback: Feedback, predicate: Callable[[int], bool] = None, selected_docs: set[int] = None) -> TopKList:
        query_vector = analyzer.set_of_words(query)

        # filter terms and obtain c_j-weights for terms in order of their importance 
        term_weights = self.filter_terms(query_vector, feedback)
        
        # get iterators for each term and fetch first posting
        iters = [iter(self.index[term]) for (term, _) in term_weights]
        nexts = [next(iter, None) for iter in iters]

        # keep track of all retrieved documents and their score; stored as tuples (doc_id, score)
        topk = TopKList(k, term_weights, predicate)

        # iterate through all streams and calculate score for smallest doc id
        while not all(e is None for e in nexts):
            # get smallest value from nexts, ignoring None values
            smallest = min(nexts, key = lambda x: x or math.inf)
            # if we have feedback, make sure document is either relevant or not assessed so far; if we have selected_docs, make sure document is in it
            if not(self.PRUNE_NON_RELEVANT and feedback.is_not_relevant(smallest)) and (selected_docs == None or smallest in selected_docs):
                # if so, add it to topk
                score = sum([term_weights[i][1] for i in range(len(nexts)) if nexts[i] == smallest])
                topk.add(smallest, score)
            # for each entry in nexts, fetch next item if entry equals smallest
            for i, e in enumerate(nexts):
                if e is smallest:
                    nexts[i] = next(iters[i], None)
        
        # finished, return topk for result iteration
        return topk

### Term-at-a-time for BIR Model

In [9]:
class BIRModel_TAAT(BIRModel):
    """
        Implements the TAAT model for the BIR model using inverted index method.
    """
    def search(self, query: str, k: int, feedback: Feedback, predicate: Callable[[int], bool] = None, selected_docs: set[int] = None) -> TopKList:
        query_vector = analyzer.set_of_words(query)

        # filter terms and obtain c_j-weights for terms in order of their importance 
        term_weights = self.filter_terms(query_vector, feedback)
        doc_scores = {}

        # iterate over terms and fetch postings
        for (term, weight) in term_weights:
            for posting in self.index[term]:
                # check if it is either not assessed or relevant; check if posting is selected_docs (if given)
                if not(self.PRUNE_NON_RELEVANT and feedback.is_not_relevant(posting)) and (selected_docs == None or posting in selected_docs):
                    doc_scores[posting] = doc_scores.get(posting, 0) + weight

        # we do not need a full sort of doc_scores, but can use the heap in TopKList
        topk = TopKList(k, term_weights, predicate)
        for doc_id, score in doc_scores.items():
                topk.add(doc_id, score)
        
        # finisheds, return topk for result iteration
        return topk

In [None]:
def search_TAAT(query, k, feedback):
    query_vector = analyzer.set_of_words(query)

    # filter terms and obtain c_j-weights 
    term_weights = filter_terms(query_vector, feedback)
    doc_scores = {}

    # iterate over terms and fetch postings
    for (term, weight) in term_weights:
        for posting in index[term]:
            # check feedback, omit if assessed and not relevant
            if feedback.is_assessed(smallest) and \
               not feedback.is_relevant(smallest): continue
            # add weight to score of document
            doc_scores[posting] = doc_scores.get(posting, 0) + weight

    # we do not need a full sort of doc_scores, but can use the heap in TopKList
    topk = TopKList(k, term_weights, predicate)
    for doc_id, score in doc_scores.items():
            topk.add(doc_id, score)
    
    # finished, return topk for result iteration
    return topk

### Loading the data

In [10]:
import ipywidgets as widgets
opt_implementation = widgets.Dropdown(options=['document-at-a-time', 'term-at-a-time'])
opt_dataset = widgets.Dropdown(options=['random', 'imdb movies'])
display(opt_implementation)
display(opt_dataset)

Dropdown(options=('document-at-a-time', 'term-at-a-time'), value='document-at-a-time')

Dropdown(options=('random', 'imdb movies'), value='random')

In [11]:
from utils import table
from datasets import random as random_docs, imdb as imdb_docs
import random

# select the implementation of the retrieval model
if opt_implementation.value == 'document-at-a-time':
    retriever = BIRModel_DAAT()
else:
    retriever = BIRModel_TAAT()

# select the dataset and define feedback function, queries, predicates, and selections
if opt_dataset.value == 'random':
    collection = random_docs
    assessments = {
        'random': lambda id: random.random() < 0.8,
        'id < 20': lambda id: id < 20,
    }
    queries = [
        'cat dog',
        'horse bird',
        'cat dog horse bird'
    ]
    predicates = {
        'even doc ids': lambda id: id % 2 == 0,
        'odd doc ids': lambda id: id % 2 == 1,
    }
    selections = {
        'doc<10': list(range(10)),
    }
elif opt_dataset.value == 'imdb movies':
    collection = imdb_docs
    assessments = {
        'top-100': lambda id: id < 100,
        'star in title': lambda id: 'star' in retriever.documents[id]['title'].lower(),
        'morgan in actor': lambda id: 'morgan' in retriever.documents[id]['actors'].lower(),
        'comedy in genre': lambda id: 'comedy' in retriever.documents[id]['genre'].lower(),
    }
    queries = [
        'star wars', 
        'drama morgan freeman', 
        'comedy'
    ]
    predicates = {
        'year < 1990': lambda id: retriever.documents[id]['year'] < 1990,
        'year >= 1990': lambda id: retriever.documents[id]['year'] >= 1990,
    }
    selections = {
        'top-100': list(range(100)),
        'top-250': list(range(250)),
    }
else:
    raise ValueError("to be implemented")

# build index
retriever.build_index(collection.load())

In [12]:
table.print([collection.format(doc) for doc in retriever.documents.values()], collection.headers(), max_rows = 10)
table.print(sorted([[term, df, retriever.index[term]] for term, df in retriever.vocabulary.items()], key=lambda x: -x[1]), ['term', 'df', 'posting'], max_rows=20)

print(f'{len(retriever.documents)} documents in collection')
print(f'{len(retriever.vocabulary)} distinct terms in collection')
print('{count} postings'.format(count=sum([len(postings) for postings in retriever.index.values()])))

|   id | title                           |   year |   runtime |   rating | genre                 | actors                          | summary                                                                                               |
|------|---------------------------------|--------|-----------|----------|-----------------------|---------------------------------|-------------------------------------------------------------------------------------------------------|
|    1 | The Shawshank Redemption        |   1994 |       142 |      9.3 | Drama                 | Tim Robbins Morgan Freeman Bob… | Two imprisoned men bond over a number of years, finding solace and eventual redemption through acts … |
|    2 | The Godfather                   |   1972 |       175 |      9.2 | Crime Drama           | Marlon Brando Al Pacino James … | An organized crime dynasty's aging patriarch transfers control of his clandestine empire to his relu… |
|    3 | The Dark Knight                 |   2008 | 

### Pretty printing functions

In [13]:
def print_feedback(feedback: Feedback, func: str, text: str = 'feedback'):
    info = ", ".join([('+' if feedback.is_relevant(doc_id) else '-') + str(doc_id) for doc_id in sorted(feedback.assessed, key=lambda doc_id: (not feedback.is_relevant(doc_id), doc_id))])
    print(f'{text} ({func}): {info}')

def print_topk(topk: TopKList, feedback: Feedback, k: int):
    list = []
    for entry in topk:
        if len(list) >= k: break
        list.append(collection.format(retriever.documents[entry['id']], [
            '+' if feedback.is_relevant(entry['id']) else '-' if feedback.is_assessed(entry['id']) else ' ',
            entry['rank'],
            round(entry['score'], 2)
        ]))
    table.print(list, collection.headers('rel', 'rank', 'score'), max_rows=k)

### Provide feedback

In [14]:
def add_feedback(feedback, topk, n_feedback):
    for entry in topk:
        if n_feedback <= 0: return
        if feedback.is_assessed(entry['id']): continue
        feedback.assess(entry['id'])
        n_feedback -= 1
    for doc_id in filter(lambda doc_id: not feedback.is_assessed(doc_id), retriever.documents.keys()):
        if n_feedback <= 0: return
        if feedback.is_assessed(doc_id): continue
        feedback.assess(doc_id)
        n_feedback -= 1

### Search with feedback iterations

In [15]:
from IPython.display import clear_output
from functools import reduce

feedback = Feedback()

def run_query(query: str, k: int, assessment: str, predicate: str, selection: str, n_feedback: int):
    global topk
    feedback.assessment_func = assessments.get(assessment, None)
    print_feedback(feedback, assessment)
    print()
    topk = retriever.search(query, k, feedback=feedback, predicate=predicates.get(predicate, None), selected_docs=selections.get(selection, None))
    add_feedback(feedback, topk, n_feedback)
    print_topk(topk, feedback, k)
    for term in sorted(topk.weights.keys(), key = lambda term: -topk.weights[term]):
        print(term.rjust(16), topk.weights[term])
    print_feedback(feedback, assessment, "\nnext feedback")

def on_next(btn):
    retriever.PRUNE_NEGATIVE_WEIGHTS = opt_neg.value
    retriever.PRUNE_WEIGHT_THRESHOLD = opt_small.value and 0.5
    retriever.PRUNE_TOPK = opt_topk.value and 10
    retriever.PRUNE_NON_RELEVANT = opt_nonrel.value
    if opt_expand.value:
        query_text = query.value + ' ' + ' '.join(reduce(lambda terms, doc_id: terms | retriever.documents[doc_id]['vector'], feedback.relevant, set()))
    else:
        query_text = query.value
    with out:
        clear_output()
        print(query_text)
        run_query(query_text, 20, assessment.value, predicate.value, selection.value, n_feedback.value)

def on_start(btn):
    feedback.clear()
    on_next(btn)

# buttons
btn_start = widgets.Button(description=' start', icon='play')
btn_start.on_click(on_start)
btn_next = widgets.Button(description=' next', icon='step-forward')
btn_next.on_click(on_next)
buttons = widgets.HBox([btn_start, btn_next])

# query left side
query=widgets.Dropdown(description='query',options=list(queries))
assessment=widgets.Dropdown(description='assessment',options=['<none>'] + list(assessments.keys()))
n_feedback=widgets.IntSlider(description='feedback', min=5, max=50, step=5, value=5)
predicate=widgets.Dropdown(description='predicate',options=['<none>'] + list(predicates.keys()))
selection=widgets.Dropdown(description='selection',options=['<none>'] + list(selections.keys()))
left = widgets.VBox([query, assessment, n_feedback, predicate, selection])

# options right side
opt_neg = widgets.Checkbox(value=False, description='prune negative weights')
opt_small = widgets.Checkbox(value=False, description='prune small weights (abs < 0.5)')
opt_topk = widgets.Checkbox(value=False, description='keep top 10 weights')
opt_nonrel = widgets.Checkbox(value=False, description='prune non relevant documents')
opt_expand = widgets.Checkbox(value=False, description='expand query with feedback')
right = widgets.VBox([opt_neg, opt_small, opt_topk, opt_nonrel, opt_expand])

# display the dialog object
display(widgets.VBox([buttons, widgets.HBox([left, right], layout={'margin': '20px'})]))

# capture output with this widget
out = widgets.Output(layout={'border': '1px solid #eeeeee', 'height': '500px', 'overflow': 'auto', 'padding': '0px 0px 0px 10px'})
display(out)

VBox(children=(HBox(children=(Button(description=' start', icon='play', style=ButtonStyle()), Button(descripti…

Output(layout=Layout(border_bottom='1px solid #eeeeee', border_left='1px solid #eeeeee', border_right='1px sol…

### What's next?