To construct a positional index for preprocessed documents and implement the SPIMI (Single-Pass In-Memory Indexing) algorithm, we'll need to follow these steps:

Read and preprocess the documents (tokenization, stop word removal, and stemming).
Construct the positional index using the SPIMI algorithm.

In [None]:
# install the required libraries 
!pip install python-docx nltk

In [1]:
import os
from docx import Document
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from collections import defaultdict, Counter
import string
import nltk
import re

# Download NLTK data
# -- punctuation library
nltk.download('punkt')
# -- stop word library
nltk.download('stopwords')

stop_words = set(stopwords.words('english'))
# Initialize the Porter Stemmer
stemmer = PorterStemmer()

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\usha_\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\usha_\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
# Preprocess the document
def preprocess(text):    
    # Tokenization
    tokens = word_tokenize(text)
    # Convert to lower case
    tokens = [token.lower() for token in tokens]
    # Remove punctuation
    tokens = [token for token in tokens if token not in string.punctuation]

    for each in range(len(tokens)):
        tokens[each] = re.sub(r'\d+', '', tokens[each])  # Remove numbers
        tokens[each] = re.sub(r'[^\w\s]', '', tokens[each])  # Remove special characters
    
    # Remove stop words
    # stop_words = set(stopwords.words('english'))
    tokens = [token for token in tokens if token not in stop_words]
    
    # Stemming
    # stemmer = PorterStemmer()
    tokens = [stemmer.stem(token) for token in tokens]
    return tokens

In [13]:
# Read DOCX files from directory
def read_multiple_docx(directory):
    documents_content = {}
    for filename in os.listdir(directory["docx"]):
        if filename.endswith('.docx'):
            file_path = os.path.join(directory["docx"], filename)
            doc = Document(file_path)
            content = []
            for para in doc.paragraphs:
                content.append(para.text)
            documents_content[filename] = preprocess('\n'.join(content))
    return documents_content

In [15]:
# Kashyap
# SPIMI Algorithm for Index Construction
def spimi_invert(token_stream, block_size=100000):
    dictionary = defaultdict(list)
    block_id = 0

    for token, doc_id, pos in token_stream:
        dictionary[token].append((doc_id, pos))
        if len(dictionary) >= block_size:
            write_block_to_disk(dictionary, block_id)
            dictionary = defaultdict(list)
            block_id += 1
    
    if dictionary:
        write_block_to_disk(dictionary, block_id)

    return block_id

def write_block_to_disk(dictionary, block_id):
    with open(f'block_{block_id}.txt', 'w') as file:
        for term, postings in sorted(dictionary.items()):
            file.write(f'{term}: {postings}\n')

def merge_blocks(num_blocks):
    final_index = defaultdict(list)

    for block_id in range(num_blocks+1):
        with open(f'block_{block_id}.txt', 'r') as file:
            for line in file:
                term, postings = line.strip().split(': ')
                postings = eval(postings)
                final_index[term].extend(postings)

    return final_index

'./BBC Sport/text'

In [17]:
# Usage example

doc_path = { "docx" : "./BBC_Sport/docs", 
             "txt" : "./BBC Sport/text"}

# txt_path = "./Assignment-1/BBC Sport/docs"
directory_path = './Assignment-1/BBC Sport/docs'

print (f"1. Read dataset")
documents = read_multiple_docx(doc_path)
# print (documents)
print (f"2. Pre-processing Complete.")

# Create token stream
token_stream = []
for doc_id, (filename, tokens) in enumerate(documents.items()):
    for pos, token in enumerate(tokens):
        token_stream.append((token, doc_id, pos))

# print (type(token_stream))
# print(token_stream)

# Construct positional index using SPIMI
num_blocks = spimi_invert(token_stream)
print (type(num_blocks))
print (num_blocks)
positional_index = merge_blocks(num_blocks)

# Print positional index
for term, postings in sorted(positional_index.items()):
    print(f'{term}: {postings}')


1. Read dataset
2. Pre-processing Complete.
<class 'int'>
0
: [(0, 18), (0, 28), (0, 29), (0, 30), (0, 31), (1, 21), (1, 23), (2, 28), (2, 29), (2, 30), (2, 31), (2, 32), (2, 33), (3, 27), (5, 15), (5, 31)]
aaa: [(5, 7)]
achiev: [(4, 14)]
aliv: [(2, 12)]
although: [(5, 23)]
andi: [(2, 15)]
athlet: [(3, 19), (5, 18)]
aussi: [(0, 3)]
australia: [(1, 30)]
australian: [(0, 15), (1, 4), (1, 9), (2, 9)]
award: [(4, 28)]
back: [(2, 2)]
battl: [(0, 2)]
beat: [(0, 10), (4, 30)]
believ: [(3, 28)]
boss: [(3, 12), (3, 21)]
britain: [(4, 16)]
campbel: [(4, 23)]
career: [(3, 5)]
challeng: [(5, 26)]
champion: [(0, 19)]
claim: [(0, 20)]
clear: [(3, 14)]
close: [(0, 34)]
come: [(2, 27)]
comment: [(1, 34)]
countri: [(3, 25)]
court: [(0, 39), (1, 2), (1, 11)]
critic: [(1, 14)]
darren: [(4, 22)]
davenport: [(0, 12)]
defend: [(1, 8)]
devonish: [(4, 25)]
domin: [(5, 19)]
dream: [(2, 8)]
drug: [(3, 16)]
face: [(2, 22)]
faster: [(1, 28)]
favourit: [(2, 21), (4, 32)]
feder: [(3, 20)]
fight: [(2, 1)]
final: [(2

In [19]:
def intersect (ltup1, ltup2):
    set1 = set(ltup1)
    set2 = set(ltup2)
    intersection = set1 & set2
    return intersection
    
def union (ltup1, ltup2):
    set1 = set(ltup1)
    set2 = set(ltup2)
    union = set1 | set2
    return union

# a complement b
def a_not_in_b (ltup1, ltup2):
    set1 = set(ltup1)
    set2 = set(ltup2)
    not_in_set1 = set2 - set1
    return not_in_set1

# b complement a 
def b_not_in_a (ltup1, ltup2):
    set1 = set(ltup1)
    set2 = set(ltup2)
    not_in_set2 = set1 - set2
    return not_in_set2




In [None]:

# def intersect(postings1, postings2):
#     """Intersect two postings lists."""
#     i, j = 0, 0
#     result = []
#     while i < len(postings1) and j < len(postings2):
#         if postings1[i] == postings2[j]:
#             result.append(postings1[i])
#             i += 1
#             j += 1
#         elif postings1[i] < postings2[j]:
#             i += 1
#         else:
#             j += 1
#     return result

# def union(postings1, postings2):
#     """Union two postings lists."""
#     i, j = 0, 0
#     result = []
#     while i < len(postings1) and j < len(postings2):
#         if postings1[i] == postings2[j]:
#             result.append(postings1[i])
#             i += 1
#             j += 1
#         elif postings1[i] < postings2[j]:
#             result.append(postings1[i])
#             i += 1
#         else:
#             result.append(postings2[j])
#             j += 1
#     result.extend(postings1[i:])
#     result.extend(postings2[j:])
#     return result

# def difference(postings1, postings2):
#     """Difference between two postings lists."""
#     i, j = 0, 0
#     result = []
#     while i < len(postings1) and j < len(postings2):
#         if postings1[i] == postings2[j]:
#             i += 1
#             j += 1
#         elif postings1[i] < postings2[j]:
#             result.append(postings1[i])
#             i += 1
#         else:
#             j += 1
#     result.extend(postings1[i:])
#     return result

In [43]:
def get_postings(keys, index):
    """Retrieve postings list for a term from the positional index."""
    extracted = {key: index[key] for key in keys if key in index}
    return extracted

    

def evaluate_boolean_query(query, index):
    """
    Evaluate a Boolean query on the positional index.
    
    Parameters:
    query (str): Boolean query in CNF
    index (dict): Positional index
    
    Returns:
    list: List of document IDs satisfying the query
    """
    # Split the query into terms and operators
    terms = re.findall(r'\b\w+\b', query.lower())
    operators = re.findall(r'AND|OR|NOT', query.upper())
    print (f"terms: {terms}")
    print (f"operators : {operators}")

    # Preprocess terms
    terms = [stemmer.stem(term) for term in terms if term not in stop_words]
    
    # Retrieve postings lists for each term
    postings_lists = get_postings(terms, index)
    print (f"postings_lists: {postings_lists}")  
        
    # Evaluate the query
    if not operators:        
        return postings_lists.get(terms[0], [])

    result = postings_lists[terms[0]]
    i = 1

    while i < len(terms):
        operator = operators[i - 1]
        if operator == 'AND':
            # print (f"param1:{result}, param2:{postings_lists[terms[i]]}")
            result = intersect(result, postings_lists[terms[i]])                        
        elif operator == 'OR':
            result = union(result, postings_lists[terms[i]])            
        elif operator == 'NOT':
            result = complement(result, postings_lists[terms[i]])            
        i += 1

    return result

# Demo
print ()
query = "will AND title"
result = evaluate_boolean_query(query, positional_index)
print(f"Documents matching the query '{query}': {result}")
print ()
query = "win OR title"
result = evaluate_boolean_query(query, positional_index)
print(f"Documents matching the query '{query}': {result}")
print ()
query = "win OR hewitt"
result = evaluate_boolean_query(query, positional_index)
print(f"Documents matching the query '{query}': {result}")
print()
query = "win AND hewitt"
result = evaluate_boolean_query(query, positional_index)
print(f"Documents matching the query '{query}': {result}")


terms: ['will', 'and', 'title']
operators : ['AND']
postings_lists: {'titl': [(0, 4), (0, 17), (0, 24), (2, 11), (5, 3), (5, 9)]}
Documents matching the query 'will AND title': [(0, 4), (0, 17), (0, 24), (2, 11), (5, 3), (5, 9)]

terms: ['win', 'or', 'title']
operators : ['OR']
postings_lists: {'win': [(0, 13), (2, 14), (5, 1)], 'titl': [(0, 4), (0, 17), (0, 24), (2, 11), (5, 3), (5, 9)]}
Documents matching the query 'win OR title': {(2, 14), (0, 4), (0, 13), (5, 1), (5, 3), (5, 9), (0, 24), (2, 11), (0, 17)}

terms: ['win', 'or', 'hewitt']
operators : ['OR']
postings_lists: {'win': [(0, 13), (2, 14), (5, 1)], 'hewitt': [(1, 18), (1, 19), (2, 0), (2, 6)]}
Documents matching the query 'win OR hewitt': {(2, 14), (1, 18), (2, 6), (0, 13), (1, 19), (2, 0), (5, 1)}

terms: ['win', 'and', 'hewitt']
operators : ['AND']
postings_lists: {'win': [(0, 13), (2, 14), (5, 1)], 'hewitt': [(1, 18), (1, 19), (2, 0), (2, 6)]}
Documents matching the query 'win AND hewitt': set()
