### 1. Индексация
Реализуйте построение инвертированного индекса в памяти для коллекции из домашней работы номер 3. В каждом постинглисте также сохраните значение term-frequency.

In [40]:
from tqdm import tqdm
import math
from typing import Dict, List, Tuple
ArticleName = str
Text = str
Term = str
TermCount = int
DocLen = int
CollectionData = None
RankingParams = None
RelevInfo = TermCount
DocId = int
TermFreq = int
Posting = List[Tuple[DocId, RelevInfo]]

In [3]:
from multiprocessing import Pool
import parser1

def load_docs(selected_docs_fn: ArticleName, threads: int = 4) -> Dict[ArticleName, Text]:    
    docs = {}
    pool = Pool(threads)
    tasks = []
    for line in tqdm(open(selected_docs_fn, encoding='utf8')):
        article_name = line.strip()
        tasks.append((article_name, pool.apply_async(parser1.get_article_text, (article_name, ))))
    for article_name, task in tqdm(tasks):
        docs[article_name] = task.get(10**6)
    return docs
    
docs = load_docs("./selected_docs.tsv", 32)
fdocs = {k: v for k, v in docs.items() if len(v) > 0}
docs, fdocs = fdocs, docs

15229it [00:00, 25724.65it/s]
100%|███████████████████████████████████████████████████████████████████████████| 15229/15229 [01:50<00:00, 138.25it/s]


In [253]:
import string
import nltk
from collections import Counter
from collections import defaultdict 
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

# nltk.download('stopwords')

stop_words = set(stopwords.words('english'))
porter = PorterStemmer()

def make_terms(text: Text) -> List[Term]:
    return [porter.stem(word) for word 
            in text.lower().translate(str.maketrans('', '', string.punctuation)).split()
            if word not in stop_words]

def invert_index(docs: Dict[ArticleName, Text]) -> Tuple[Dict[Term, Posting], Dict[ArticleName, DocId], float]:
    inv_index = defaultdict(list)
    article_to_doc_id = {}
    doc_id_to_len = {}
    terms_total = 0
    for doc_id, (name, content) in enumerate(docs.items()):
        article_to_doc_id[doc_id] = name
        terms = make_terms(content)
        doc_id_to_len[doc_id] = len(terms)
        terms_total += len(terms)
        for term, count in Counter(terms).items():
            inv_index[term].append((doc_id, count))
    return inv_index, article_to_doc_id, terms_total / len(docs), doc_id_to_len

index, doc_id_to_article, mean_doc_len, doc_id_to_len = invert_index(docs)

documents_terms = {k: make_terms(v) for k, v in docs.items()}
term_index = defaultdict(lambda: defaultdict(int))

for name, content in documents_terms.items():
    for term in content:
        term_index[term][name] += 1
        
def tf(index, documents):
    tfs = defaultdict(float)
    for key, value in index.items():
        for document, count in value.items():
            tfs[(key, document)] = count * 1. / len(documents[document])
            
    return tfs

def idf(index, documents):
    return {term: math.log(len(documents) * 1. / len(value)) for term, value in index.items()}

idfs = idf(term_index, documents_terms)
tfs = tf(term_index, documents_terms)

article_to_doc_id = {}
for a, doc_id in doc_id_to_article.items():
    article_to_doc_id[doc_id] = a

Сохраните полученный индекс на диске в бинарном формате. Формат должен позволять читать отсортированные по термам постинглисты, по одному за раз. Размер сохраненного индекса в байтах должен быть порядка 8*(сумму длин всех постинг листов). 

Отдельно сохраните на диск дополнительные данные о коллекции, которые пригодятся для поиска, например названия статей или среднюю длину документа. Размер дополнительных данных, должен быть пропорционален количеству документов коллекции.

In [248]:
import struct
import os
import pickle


def dump_index(index: Dict[Term, Posting], filename: str) -> None:
    with open(filename, 'wb') as f:
        f.write(struct.pack("i", len(index)))
        for term, posting in index.items():
            bytes_term = term.encode()
            bytes_term_len = len(bytes_term)
            p_len = len(posting)
            assert bytes_term_len < 2**32 - 1 and p_len < 2**32 - 1
            doc_ids = [p[0] for p in posting]
            freqs = [p[1][0] for p in posting]

            f.write(struct.pack("i", bytes_term_len))
            f.write(struct.pack(f"{len(bytes_term)}s", bytes_term))
            f.write(struct.pack("i", p_len))
            f.write(struct.pack(f"{p_len}i", *doc_ids))
            f.write(struct.pack(f"{p_len}i", *freqs))


# def dump_collectiondata(data: CollectionData, filename: str) -> None:
#     #
#     #
#     #
#     # Code here
#     #
#     #
#     #
#     #

dump_index(index, "index.inv")
print("Index file size:", os.path.getsize("index.inv"))

# dump_collectiondata(collection_data, "index.data")
# print("Collection data file size:", os.path.getsize("index.data"))

Index file size: 15359637


### 2. Поиск
Для простоты реализации поиска, не требуется делать чтение постинглистов с диска по запросу - достаточно считать их с диска в память целиком. Также загрузите с диска дополнительные данные о коллекции.

In [260]:
def load_index(filename: str) -> Dict[Term, Posting]:
    index = {}
    with open(filename, "rb") as f:
        size = struct.unpack("i", f.read(4))[0]
        for _ in range(size):
            term_len = struct.unpack("i", f.read(4))[0]
            term_b = f.read(term_len)
            term = struct.unpack(f"{term_len}s", term_b)[0].decode()

            bytes_arr_len = struct.unpack("i", f.read(4))[0]
            doc_ids_b = f.read(bytes_arr_len * 4)
            tfs_b = f.read(bytes_arr_len * 4)
            doc_ids = struct.unpack(f"{bytes_arr_len}i", doc_ids_b)
            tfs = struct.unpack(f"{bytes_arr_len}i", tfs_b)
            index[term] = list(zip(doc_ids, tfs))

    return index   

    
# def load_collectiondata(filename: str) -> CollectionData:
#     #
#     #
#     #
#     # Code here
#     #
#     #
#     #
#     #

index = load_index("index.inv")
print("Number or terms in index:", len(index))
# collection_data = load_collectiondata("index.data")
collection_data = (doc_id_to_article, )

Number or terms in index: 186165


Реализуйте поиск документовна с ранжированием BM25 на основе инвертированного индекса в парадигме document-at-time, то есть через [слияние](https://en.wikipedia.org/wiki/Merge_algorithm) постинглистов. Функция поиска должна принимать число - ограничение на количество документов, возвращаемое поиском. Используемое количество дополнительной памяти должно быть пропорционально этому ограничению и никак не должно зависить от размера постинглистов или размера коллекции.
Результаты поиска должны быть аналогичные тем, что были в домашней работе номер 3. 

In [227]:
from heapq import heapify, heappush, heappushpop, heappop

class MinHeap:
    def __init__(self, top_n: int):
        self.h = []
        self.length = top_n
        heapify(self.h)

    def add(self, element: int):
        if len(self.h) < self.length:
            heappush(self.h, element)
        else:
            heappushpop(self.h, element)
    
    def top(self):
        return self.h[0]
    
    def values(self):
        return self.h
    
    def pop(self):
        return heappop(self.h)
    
    def __len__(self):
        return len(self.h)
    
def test_search(search):
    for query in 
            [
                "coronovirus in belarus",
                "who won junior eurovision in 2005",
                "science about full-text search",
            ]:
        result = search(query)[:5]
        print(f"[{query}]")
        for article_name, score in result:
            print(f"{score:7.2f}  {article_name}")
        print("\n")

In [261]:
from copy import copy
from collections import deque

def search_indexed(query: Text, top_size: int, index: Dict[Term, Posting], collection_data: CollectionData, b, k1, k2, type) -> List[Tuple[ArticleName, float]]:
    query_terms = Counter(filter(lambda x: x in index, make_terms(query)))
    postings = [deque(copy(index[t])) for t in query_terms.keys()]
    posting_heap = MinHeap(len(query_terms))
    for i, (term, posting) in enumerate(zip(query_terms.keys(), postings)):
        posting_heap.add((posting[0], i))
    
    if len(query_terms) == 0:
        return [('', 0) for _ in range(top_size)]
    
    top = MinHeap(top_size)
    prev_doc_id = posting_heap.top()[0][0]
    bm25 = 0
    while len(posting_heap) > 0:
        (doc_id, tf), term_id = posting_heap.top()
        posting_heap.pop()
        postings[term_id].popleft()
        if len(postings[term_id]) != 0:
            posting_heap.add((postings[term_id][0], term_id))
#             print(postings[term_id][0])
        if doc_id != prev_doc_id:
            top.add((bm25, prev_doc_id))
            bm25 = 0
            
        K = k1 * ((1 - b) + b * doc_id_to_len[doc_id] * 1. / mean_doc_len)
        term = list(query_terms.keys())[term_id]
        bm25 += idfs[term] * (k1 + 1) * tf * 1. / (K + tf) * \
                (k2 + 1) * query_terms[term] * 1. / (k2 + query_terms[term])
        prev_doc_id = doc_id
    
    top.add((bm25, prev_doc_id))

    return sorted([(collection_data[0][doc_id], bm25) for bm25, doc_id in top.values()], key = lambda x: -x[1])

ranking_params = {'type': '', "b" : 1, "k1" : 1, "k2" : 1}
test_search(lambda x: search_indexed(x, 5, dict(index), collection_data, **ranking_params))

[coronovirus in belarus]
  10.13  Time_in_Belarus
   9.80  COVID-19_pandemic_in_Belarus
   9.75  Daugava_River
   9.72  Bug_River
   9.38  Eurasian_Union


[who won junior eurovision in 2005]
  20.65  Junior_Eurovision_Song_Contest_2019
  19.01  Junior_Eurovision_Song_Contest_2015
  18.61  Junior_Eurovision_Song_Contest_2004
  18.54  Junior_Eurovision_Song_Contest_2014
  15.16  Blue_(group)


[science about full-text search]
  23.33  Information_retrieval
  12.48  On-Line_Encyclopedia_of_Integer_Sequences
  11.79  Scientist
  10.98  Citizen_science
  10.69  Binary_search




In [None]:
# from statistics import mean 

# def search_bm25(query, b, k1, k2):
# #     query_terms = defaultdict(int)
# #     query = make_terms(query)

#     query_terms = Counter(filter(lambda x: x in index, make_terms(query)))

#     documents_range = []
#     for name, content in documents_terms.items():
# #         print(content)
#         if len(content) == 0:
#             continue
        
#         K = k1 * ((1 - b) + b * (len(content) * 1. / mean_doc_len))
#         result = 0.
#         for term in query_terms.keys():
#             tf = tfs[(term, name)]
#             result += idfs[term] * (k1 + 1) * tf * 1. / (K + tf) * \
#                 (k2 + 1) * query_terms[term] * 1. / (k2 + query_terms[term])
#         documents_range.append((name, result))
        
#     return sorted(documents_range, key=lambda x: -x[1])

# test_search(lambda x: search_bm25(x, **ranking_params))

Реализуйте static pruning до 50 элементов для каждого постинглиста. Сравните качество и скорость работы нового индекса с предыдущим.

In [264]:
def prune(index: Dict[Term, Posting], top_size: int = 50) -> Dict[Term, Posting]:
    return {term: sorted(sorted(posting, key=lambda x: -x[1])[:top_size], key=lambda x: x[0]) 
            for term, posting in index.items()}

pruned_index = prune(index, 50)
for term, posting in pruned_index.items():
    prev_doc_id = -1
    for doc_id, freq in posting:
        assert doc_id > prev_doc_id
        prev_doc_id = doc_id

In [265]:
test_search(lambda x: search_indexed(x, 5, pruned_index, collection_data, **ranking_params))

[coronovirus in belarus]
  19.39  Time_in_Belarus
  18.70  COVID-19_pandemic_in_Belarus
  16.55  Byelorussian_Soviet_Socialist_Republic
  16.31  Eurasian_Union
  15.42  Daugava_River


[who won junior eurovision in 2005]
  31.93  Junior_Eurovision_Song_Contest_2019
  26.18  Junior_Eurovision_Song_Contest_2004
  25.15  Junior_Eurovision_Song_Contest_2015
  24.32  Junior_Eurovision_Song_Contest_2014
  23.20  Eurovision:_Europe_Shine_a_Light


[science about full-text search]
  23.75  Information_retrieval
  18.27  Google_Search
  15.83  Binary_search
  14.38  Ask.com
  13.94  Aliweb




Сравните качество и скорость работы нового алгоритма поиска с предыдущим.

In [304]:
def load_queries(queries_fn: ArticleName) -> List[Tuple[Text, ArticleName]]:
    queries = []
    for line in open(queries_fn):
        query, answer = line.rstrip().split('\t', 1)
        queries.append((query, answer))
    return queries

queries = list(filter(lambda x: x[1] in docs, load_queries("./queries.tsv")))
for query, answer in queries:
    assert answer in docs

def search(query, ranking_params = None):
    if ranking_params["type"] == "bm25":
        return search_indexed(query, 10, index, collection_data, **ranking_params)
    elif ranking_params["type"] == "bm25_pruned":
        return search_indexed(query, 10, pruned_index, collection_data, **ranking_params)

def run(
    title,
    queries: List[Tuple[Text, ArticleName]],
    collection_data: CollectionData,
    ranking_params: RankingParams) -> None:
    accuracy = 0.0
    accuracy10 = 0.0
    rr = 0.0
    processed = 0
    with tqdm(queries) as progress:
        for query, answer in progress:
            result = search(query, ranking_params)[:10]
            
            rank = None
            for position, (article_name, score) in enumerate(result):
                if article_name == answer:
                    rank = position + 1
                    break
                
            if rank is not None:
                accuracy += (rank == 1)
                accuracy10 += (rank <= 10)
                rr += 1.0 / rank
                
            processed += 1
            progress.set_description(f'Acc: {accuracy/processed:0.2f}, Acc10: {accuracy10/processed:0.2f}, RR: {rr/processed:0.2f}')
    print(f'{title}\n  Accuracy: {accuracy/processed:0.2f}\n  Accuracy10: {accuracy10/processed:0.2f}\n  RR: {rr/processed:0.2f}')

ranking_params = {'type': 'bm25', 'b': 0.625, 'k1': 4, 'k2': 1} 
run("BM25 index merge", queries, documents_terms, ranking_params)

ranking_params['type'] = "bm25_pruned"
run("BM25 pruned", queries, documents_terms, ranking_params)

Acc: 0.28, Acc10: 0.56, RR: 0.36: 100%|██████████████████████████████████████████████| 199/199 [00:04<00:00, 45.06it/s]
Acc: 0.24, Acc10: 0.48, RR: 0.32:  25%|███████████▌                                  | 50/199 [00:00<00:00, 499.92it/s]

BM25 index merge
  Accuracy: 0.28
  Accuracy10: 0.56
  RR: 0.36


Acc: 0.25, Acc10: 0.46, RR: 0.33: 100%|█████████████████████████████████████████████| 199/199 [00:00<00:00, 479.52it/s]

BM25 pruned
  Accuracy: 0.25
  Accuracy10: 0.46
  RR: 0.33





Из lab3:  
BM25{'type': 'bm25', 'b': 0.625, 'k1': 4, 'k2': 1}  
Accuracy: 0.28  
Accuracy10: 0.57  
RR: 0.36

## ~Дополнительно~
### Сжатие индекса (+1 балл)
Реализуйте кодирование чисел алгоритмом VarInt.

In [301]:
import struct 
import io


class baseline_coder:
    def encode(output_stream, posting):
        for doc_id, freq in posting:
            output_stream.write(struct.pack('II', doc_id, freq))
            
    def decode(input_stream):
        res = []
        while True:
            data = input_stream.read(struct.calcsize('II'))
            if len(data) == 0:
                break
            res.append(struct.unpack('II', data))
        return res


class varint_coder:
    def encode_num(number):
        buf = b''
        while True:
            towrite = number & 0x7f
            number >>= 7
            if number != 0:
                buf += (towrite | 0x80).to_bytes(1, byteorder='big')
            else:
                buf += (towrite).to_bytes(1, byteorder='big')
                break
        return buf
    
    def decode_num(input_stream):
        shift = 0
        result = 0
        i = 0x80
        while i & 0x80 != 0:
            i = ord(input_stream.read(1))
            result |= (i & 0x7f) << shift
            shift += 7

        return shift / 7, result
    
    def decode(input_stream):
        size = len(input_stream.getvalue())
        total_read = 0
        res = []
        while total_read < size:
            read, data = varint_coder.decode_num(input_stream)
            total_read += read
            res.append(data)
        return res
        
    def encode(output_stream, posting):
        for num in posting:
            output_stream.write(varint_coder.encode_num(num))

output = io.BytesIO()
varint_coder.encode(output, [1, 2, 3, 300, 20000])
print(output.getvalue())

posting = varint_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == [1, 2, 3, 300, 20000]

b'\x01\x02\x03\xac\x02\xa0\x9c\x01'
[1, 2, 3, 300, 20000]


Сравните эффективность разных вариантов кодирования постинглистов:
 - Базовый вариант (4 байта на число)
 - Какой-нибудь алгоритм сжатия общего назначения (lz4/zstd/brotli/gzip)
 - VarInt
 - Delta-кодирование + Какой-нибудь алгоритм сжатия общего назначения 
 - Delta-кодирование + VarInt

In [302]:
def test_encoded_size(coder, index):
    total_size = 0
    for term, posting in tqdm(index.items()):
        if len(posting) == 0:
            continue
        output = io.BytesIO()
        coder.encode(output, posting)
        data = output.getvalue()
        total_size += len(data)
        decoded_posting = coder.decode(io.BytesIO(data))
        assert decoded_posting == posting, f"{decoded_posting} != {posting}"
    print(f"{coder.__name__}: {total_size/1024/1024} MB")    
    
test_encoded_size(baseline_coder, index)

100%|██████████████████████████████████████████████████████████████████████| 186165/186165 [00:01<00:00, 115628.63it/s]

baseline_coder: 11.728889465332031 MB





In [290]:
class varint_posting_coder:
    def encode(output_stream, posting):
        unzipped_posting = list(zip(*posting))
        varint_coder.encode(output_stream, unzipped_posting[0])
        varint_coder.encode(output_stream, unzipped_posting[1])
            
    def decode(input_stream):
        decoded = varint_coder.decode(input_stream)
        left = decoded[:len(decoded)//2]
        right = decoded[len(decoded)//2:]
        return list(zip(left, right))
    
test_encoded_size(varint_posting_coder, index)

100%|███████████████████████████████████████████████████████████████████████| 186165/186165 [00:06<00:00, 28236.65it/s]

varint_posting_coder: 4.381299018859863 MB





In [280]:
import lz4.frame

class lz4_coder:
    def encode(output_stream, posting):
        output_stream0 = io.BytesIO()
        baseline_coder.encode(output_stream0, posting)
        output_stream.write(lz4.frame.compress(output_stream0.getvalue()))
            
    def decode(input_stream):
        decompressed = lz4.frame.decompress(input_stream.getvalue())
        return baseline_coder.decode(io.BytesIO(decompressed))
    
test_encoded_size(lz4_coder, index)

100%|███████████████████████████████████████████████████████████████████████| 186165/186165 [00:02<00:00, 71988.96it/s]

lz4_coder: 11.713878631591797 MB





In [303]:
def delta_encode(posting):
    delta_posting = []
    last = -1
    for doc_id, f in posting:
        if last == -1:
            delta_posting.append((doc_id, f))
        else:
            delta_posting.append((doc_id - last - 1, f))
        last = doc_id
    return delta_posting

def delta_decode(delta_posting):
    posting = []
    last = -1
    for doc_id, f in delta_posting:
        if last == -1:
            last = doc_id
        else:
            last = doc_id + last + 1
        posting.append((last, f))
    return posting

class delta_coder:
    def encode(output_stream, posting):
        delta_posting = delta_encode(posting)
        lz4_coder.encode(output_stream, delta_posting)
        
    def decode(input_stream):
        delta_posting = lz4_coder.decode(input_stream)
        return delta_decode(delta_posting)

test_encoded_size(delta_coder, index)

100%|███████████████████████████████████████████████████████████████████████| 186165/186165 [00:03<00:00, 54354.32it/s]

delta_coder: 11.745101928710938 MB





In [293]:
class delta_varint_coder:
    def encode(output_stream, posting):
        delta_posting = delta_encode(posting)
        varint_posting_coder.encode(output_stream, delta_posting)
        
    def decode(input_stream):
        delta_posting = varint_posting_coder.decode(input_stream)
        return delta_decode(delta_posting)
    
test_encoded_size(delta_varint_coder, index)

100%|███████████████████████████████████████████████████████████████████████| 186165/186165 [00:05<00:00, 32483.68it/s]

delta_varint_coder: 3.4431543350219727 MB



