In [1]:
import os
import json

def getFpDict(count):
    fpDict = {}
    for i in range(1,count+1):
        with open(f'Data/Posting{i}.txt', "r+") as fp:
            while not (fp.tell() == os.fstat(fp.fileno()).st_size):
                fpPos = fp.tell()
                line = fp.readline().split('-')
                term = line[0]
                if term not in fpDict:
                    fpDict[term] = {}
                fpDict[term][i] = fpPos
    return fpDict

def getTermInfo(term:str, fpDict:dict):
    termInfo = None
    if term not in fpDict: return termInfo
    for count in fpDict[term]:
        with open(f'Data/Posting{count}.txt', "r+") as fp:
            fp.seek(fpDict[term][count])
            tInfo = fp.readline().strip().split('-')[1]
            tInfo = json.loads(tInfo)
            if not termInfo:
                termInfo = tInfo
            else:
                #Merge
                termInfo['df'] += tInfo['df']
                termInfo['tf'].update(tInfo['tf'])
                termInfo['docIds'].extend(tInfo['docIds'])
    return termInfo


def getFpIdDict():
    fpDict = {}
    with open(f'docId.txt', "r+") as fp:
        while not (fp.tell() == os.fstat(fp.fileno()).st_size):
            fpPos = fp.tell()
            line = fp.readline().split()
            docId = line[0]
            fpDict[int(docId)] = fpPos
    return fpDict

def getUrl(docId, fpDict) -> str:
    with open(f'docId.txt', "r+") as fp:
        fp.seek(fpDict[docId])
        return fp.readline().strip().split()[1]
    
def getDocLen(docId, fpDict) -> str:
    with open(f'docId.txt', "r+") as fp:
        fp.seek(fpDict[docId])
        return int(fp.readline().strip().split()[2])

In [2]:
import math
def calculate_idf(N, df):
    idf = math.log((N + 0.1) / (df + 0.1))
    return idf

In [3]:
from nltk.stem import PorterStemmer
import re

def stemQuery(query:str) -> list:
    stemmer = PorterStemmer()
    queryList = list()
    line = query.strip()
    if line != '':
        for aToken in re.split('[^a-z0-9]', line.lower()):
            if (aToken != ''):
                token = stemmer.stem(aToken)
                queryList.append(token)
    return queryList

def getDocIds(queryWords:list, fpDict) -> set[int]:
    unionSet = set()
    for word in queryWords:
        termInfo = getTermInfo(word,fpDict)
        if termInfo:
            docSet = set(termInfo['docIds'])
            if len(unionSet) == 0:
                unionSet = docSet
            else:
                unionSet = unionSet.intersection(docSet)
    return unionSet

def getIdfDict(queryList:list[str], fpDict, N = 55382) -> dict:
    idfDict = {}
    for word in queryList:
        wordDf = getTermInfo(word,fpDict)['df']
        idfDict[word] = calculate_idf(N, wordDf)
    return idfDict

def getInvertedTf(term:str, docIdList:list[int], fpDict, idDict) -> dict:
    resultTf = {}
    resultTf[term] = {}
    tfDict = getTermInfo(term,fpDict)['tf']
    for docId in docIdList:
        docIdStr = str(docId)
        if docIdStr in tfDict:
            resultTf[term][f'doc{docId}'] = tfDict[docIdStr] / getDocLen(docId, idDict)
    return resultTf

def getInvertedTfDict(queryList:list, docList:list[int], fpDict, idDict) -> dict:
    resultTf = {}
    for word in queryList:
        resultTf.update(getInvertedTf(word,docList, fpDict, idDict))
    return resultTf

In [4]:
#Create Index of Index before query
fpDict = getFpDict(3)
idDict = getFpIdDict()

In [105]:
import time
start = time.time()

qWords = stemQuery("fox dog and") # ['fox', 'dog', 'and']
docIds = getDocIds(qWords,fpDict) # {6047, 7852, 22732, 25237, 25274, 32946, 37727, 42188, 46631, 49958}
idf_dict = getIdfDict(qWords,fpDict) # {'fox': 6.229746822993642, 'dog': 4.945407031631708, 'and': 0.5274289094373557}
inverted_index_tfs = getInvertedTfDict(qWords, docIds, fpDict, idDict)

end = time.time()

print(round((end - start) * 1000),'ms') #Time in milliseconds

67 ms
