# HW1 布尔查询之BSBI与索引压缩

**李娅琦  2213603**

# 代码框架

本次实验具体包含的内容有：
1. [索引构建 (40%)](#索引构建与检索-(40%)) 使用BSBI方法模拟在内存不足的情况下的索引构建方式，并应用于布尔查询
2. [索引压缩 (30%)](#索引压缩-(30%)) 使用可变长编码对构建的索引进行压缩
3. [布尔检索 (10%)](#布尔联合检索-(10%)) 对空格分隔的单词查询进行联合（与）布尔检索
3. [实验报告 (10%)](#Report-(25%)) 描述你的代码并回答一些问题
4. [额外的编码方式 (10%)](#额外的编码方式-(10%)) 鼓励使用额外的编码方式对索引进行压缩 (例如, gamma-encoding)

# 索引的构建与检索

## 映射

### IdMap类

该类主要是实现`term-termID`以及`doc-docID`的转换，其中补充编写了函数`_get_str()`和`_get_id()`。

函数`_get_str()`将id转换成string，首先判断是否有id存在，如果存在则返回对应的string；

函数`_get_id()`将string转换为id，如果string已经有则直接返回string对应位置存放的id，否则将其加入到list中在返回对应的id值。

In [None]:
def _get_str(self, i):
    ### Begin your code
    if i < 0 or i >= len(self.id_to_str):
        raise IndexError("termID out of range.")
    return self.id_to_str[i]
    ### End your code
def _get_id(self, s):
    ### Begin your code
    if s in self.str_to_id:
      return self.str_to_id[s]
    else:
        new_id = len(self.id_to_str)
        self.str_to_id[s] = new_id
        self.id_to_str.append(s)
        return new_id
    ### End your code

## 索引

### BSBIIndex类

在该类中主要实现分块处理创建索引，其中补充编写了函数`parse_block()`和`invert_write()`。

函数`parse_block()`接受子目录路径作为参数，创建`td_pairs[]`用于存储索引并返回。主要思路如下：

1. 遍历每个子目录（即每个block）中的每个文件，使用`doc_id_map()`函数得到docID。
2. 每个文件按行遍历并通过分词得到token（即term），按照token使用`term_id_map()`函数得到termID。
3. 将所得的`termID-docID`对存储至td_pairs[]中。

In [None]:
def parse_block(self, block_dir_relative):##参数是子目录路径
    ### Begin your code
    td_pairs = []  ## 存储termID-docID对 
    block_dir_path = os.path.join(self.data_dir, block_dir_relative) ## block路径
    # 遍历block中的文件
    for file_name in os.listdir(block_dir_path):
        doc_id = self.doc_id_map[os.path.join(block_dir_relative, file_name)]
        with open(os.path.join(block_dir_path, file_name), 'r') as file:
            for line in file.readlines():
                tokens = line.split(' ')
                for token in tokens:
                    term_id = self.term_id_map[token.strip()]
                    td_pairs.append((term_id, doc_id))
    return td_pairs
    ### End your code

### 倒排表

实现InvertedIndexWriter类。和列表类似，该类提供了append()函数，但是倒排表不会存储在内存中而是直接写入到磁盘里。下面按照函数append()中给出的三个function来补充实现该函数：

1. 使用`postings_encoding`对postings_list进行编码，变成字节数组的形式。
2. 对于每个term构建`三元组（起始位置，文档数量，字节长度）`，并将三元组信息加入postings_dict中。
3. 使用write()函数将编码的字节流写入磁盘索引文件中。

In [None]:
def append(self, term, postings_list):
   ### Begin your code
   postings = self.postings_encoding.encode(sorted(postings_list)) ##编码
   if len(self.terms)!=0 :
      last_term = self.terms[-1] 
   else:
      last_term = None
   if last_term is not None:
      start_position = self.index_file.tell()  ##索引文件中的起始位置
   else:
      start_position = 0
   number_of_postings = len(postings_list)  ##文档数量
   length_in_bytes=len(postings)   ##字节长度 
   self.postings_dict[term] = (start_position, number_of_postings, length_in_bytes)
   self.terms.append(term)   
   self.index_file.write(postings)   ##将字节流附加到磁盘上的索引文件中
   ### End your code

实现invert_write类，将解析得到的td_pairs转换为倒排表并写入磁盘。其中`invert_write()`函数的实现思路如下：

1. 对于得到的td_pairs使用`lamda()函数`按照term进行排序。
2. 遍历排序好的td_pairs将termID相同的docID进行合并存放在postings_dict[term_id]中。
3. 将已经转换好的倒排表利用append()函数写入磁盘。

In [None]:
def invert_write(self, td_pairs, index):
    ### Begin your code
    td_pairs = sorted(td_pairs, key=lambda x: self.term_id_map[x[0]])
    postings_dict = {}  ##td_pairs转换为倒排表
    for term_id, doc_id in td_pairs:
        if term_id not in postings_dict:
            postings_dict[term_id] = []
        postings_dict[term_id].append(doc_id)
    for term_id, doc_ids in postings_dict.items():  ##写入磁盘
        index.append(term_id, list(set(doc_ids)))
    ### End your code

## 合并


通过构建`InvertedIndex`的子类`InvertedIndexIterator`迭代地从磁盘上每次读取文件的一个倒排列表。其中补充编写的`_next_()`函数思路如下：

1. 设置变量`current_position`和`index_file`分别为在文件中的位置以及对应的索引文件。
2. 通过当前在文件中的位置得到`termID`并利用三元组信息得到termID对应的`posting_list`并返回（即termID以及对应的文档编号组）。

In [None]:
def _initialization_hook(self):
    ### Begin your code
    self.current_position=0 ##开始时从文件头
    self.index_file = open(self.index_file_path,"rb+")
    ### End your code
    
def __next__(self):
    ### Begin your code
    if self.current_position>= len(self.terms):##判断位置范围
        raise StopIteration 
    ##termID--docsID
    termID = self.terms[self.current_position] 
    start_position, n_postings, length_in_bytes = self.postings_dict[termID]
    self.index_file.seek(start_position)
    postings_list = self.postings_encoding.decode(self.index_file.read(length_in_bytes))
    self.current_position += 1
    return termID, postings_list
    ### End your code

实现读取之后，接下来就要实现合并。根据已知的`heaqp.merge()`函数和`lambda()`函数实现`merge()`函数，完成倒排表的合并以及磁盘写入，主要思路如下（类似于归并排序）：

1. 使用 `heapq.merge()` 合并使用`lamda()`按termID排序后的索引。
2. 遍历合并后的索引列表，将新的 `termID-curr_postings` 元组添加到merged_index中，并更新 `last_term` 与`last_posting`，将已存在的`termID`对应的文档编号列表合并到 `last_posting` 列表中以确保每个termID被添加一次。
3. 将最后一个词添加到merged_index中。


In [None]:
def merge(self, indices, merged_index): #不能用查表方式
    ### Begin your code
    last_term = None
    last_posting = []
    for termID, curr_postings in heapq.merge(*indices, key=lambda x: self.term_id_map[x[0]]):
        if termID != last_term: ## 不同则append新的term
            if last_term:
                merged_index.append(last_term, list(set(last_posting)))
            last_term = termID
            last_posting = curr_postings
        else:## 相同
            last_posting += curr_postings 
    if last_term:
        merged_index.append(last_term,list(set(last_posting)))
    ### End your code

# 布尔联合检索

首先实现`InvertedIndex`的子类`InvertedIndexMapper`，找到对应terms在索引文件中位置并取出它的倒排记录表。其中`_get_postings_list()`实现思路如下：

根据三元组信息，打开对应文件并找到termID对应的倒排记录表的字节流，对其进行解码得到倒排记录表。

In [None]:
def _get_postings_list(self, term):
    ### Begin your code
    start_position, n_postings, length_in_bytes = self.postings_dict[term]
    with open(self.index_file_path, 'rb+') as index_file:
        index_file.seek(start_position)  ##起始位置
        postings_list_bytes = index_file.read(length_in_bytes)  ##读取postings列表
        postings_list = self.postings_encoding.decode(postings_list_bytes) ##解码
    return postings_list
    ### End your code

得到倒排记录表之后，实现`sorted_intersect()`函数，遍历两个有序列表并在线性时间内合并以求交集。主要思路与merge()函数和归并排序类似便不做介绍。

In [None]:
def sorted_intersect(list1, list2):
    ### Begin your code
    i, j = 0, 0
    intersection = []  #结果
    while i < len(list1) and j < len(list2):
        if list1[i] == list2[j]:
            intersection.append(list1[i])
            i += 1
            j += 1
        elif list1[i] < list2[j]:
            i += 1
        else:
            j += 1
    return intersection
    ### End your code

利用`sorted_intersect` 和 `InvertedIndexMapper`来实现`retrieve`函数，对于给定的包含由空格分隔tokens的字符串查询，返回包含查询中所有tokens的文档列表。主要思路如下：

1. 将给定的字符串`query`进行分词处理，得到tokens并使用`term_id_map`和`lambda()`得到对应的排序好的termID存于`terms`中
2. 使用`InvertedIndexMapper`类创建映射器对象mapper访问倒排索引获取termID对应的文档列表，使用 `sorted_intersect()` 函数依次计算两个列表的交集并保持列表排序
3. 使用 `map()`将文档ID映射回文档名称，使用`sorted()`确保结果列表是排序的并返回

In [None]:
def retrieve(self, query):
    if len(self.term_id_map) == 0 or len(self.doc_id_map) == 0:
        self.load()
    ### Begin your code     
    tokens=query.strip().split()
    terms = list(map(lambda x : self.term_id_map[x],tokens))
    if len(tokens) ==0:
        return []
    result=[]
    with InvertedIndexMapper(self.index_name, directory=self.output_dir, postings_encoding=self.postings_encoding) as mapper:
        result.extend(mapper[terms[0]])
        for i in range(1,len(terms)):
            result = sorted_intersect(result,mapper[terms[i]])
    return sorted(list(map(lambda x : self.doc_id_map[x],result)))
    ### End your code

# 索引压缩

通过[Chapter 5](https://nlp.stanford.edu/IR-book/pdf/05comp.pdf)以及网站我们了解到了gap-encoding和可变长字节编码，参考以实现CompressedPostings类。

其中gap-encoding是一种压缩技术，docID列表中，如果连续的文档ID之间的gap较小，那么这些差异可以用更少的位数来表示。

可变长字节编码的基本原理是将每个整数分成多个部分，每个部分称为一个“段”（chunk），每个段包含7位有效数据（因为一个字节有8位，其中1位用于表示后续是否还有字节）。

编码过程如下（解码逆过来就可以）：
- 分割整数：将整数分成多个7位的段
- 编码每个段：每个段的最高位（第8位）用于表示是否有后续的段，1表示还有，0表示结束
- 拼接段：将所有段拼接形成最终的编码

实现思路如下：

1. `encode()`函数：将列表中第一个docID编码为字节，然后遍历列表计算docID与前一个docID之间的差值（gap），并使用`_varint_encode()`函数将这些差值编码为可变长字节。

2. `decode()`函数：创建空列表存储解码后的docID，遍历字节序列使用`_varint_decode()`函数解码并将解码后的第一个整数添加到postings_list中，对于后续的整数将其与postings_list中的最后一个文档ID相加，得到实际的docID并添加到列表中。

3. `_varint_encode()`函数：不断地将整数与0x7F进行与操作来获取低7位，然后将结果左移7位，直到整数为0。如果整数不为0，则将最高位设置为1，表示后续还有字节。

4. `_varint_decode()`函数：不断地读取字节，并根据最高位是否为1来决定是否继续读取下一个字节，直到遇到最高位为0的字节为止。


In [None]:
class CompressedPostings:
    @staticmethod
    def _varint_encode(n): ##可变长字节编码
        encoded = bytearray()
        while True:
            seven_bits = n & 0x7F
            n >>= 7
            if n:
                encoded.append(seven_bits | 0x80)
            else:
                encoded.append(seven_bits)
                break
        return bytes(encoded)
    @staticmethod
    def _varint_decode(data, start_index): ##可变长字节解码
        result = 0
        shift = 0
        while True:
            b = data[start_index]
            start_index += 1
            result |= (b & 0x7F) << shift
            if not (b & 0x80):
                break
            shift += 7
        return result, start_index
    ### End your code
    @staticmethod
    def encode(postings_list):
        ### Begin your code
        if not postings_list:
            return b''
        encoded = bytearray()
        last_doc_id = postings_list[0]
        
        for doc_id in postings_list[1:]:
            gap = doc_id - last_doc_id
            encoded += CompressedPostings._varint_encode(gap)
            last_doc_id = doc_id
        encoded = CompressedPostings._varint_encode(postings_list[0]) + encoded
        return bytes(encoded)
        ### End your code
    @staticmethod
    def decode(encoded_postings_list):
        ### Begin your code
        postings_list = []
        index = 0
        # Decode the first docId
        doc_id, index = CompressedPostings._varint_decode(encoded_postings_list, index)
        postings_list.append(doc_id)
        
        while index < len(encoded_postings_list):
            gap, index = CompressedPostings._varint_decode(encoded_postings_list, index)
            doc_id += gap
            postings_list.append(doc_id)
        return postings_list
        ### End your code

# 额外的编码方式

通过补充`ECCompressedPostings`的`encode` 和 `decode`方法来实现一种额外的索引压缩方式**gamma-encoding**。

gamma编码的步骤如下(即`gamma_encode()`函数)：
1. 对于数字x分解成 x=2N + M
2. 对于N+1使用一元编码
3. 对于M使用比特宽度为N的二进制编码

其中一元编码是指：对于数字N，使用N-1个二进制1和末尾一个0表示，如数字3的一元编码为：110

解码(即`gamma_decode()`函数)就是将解码反过来，从编码的字节串读取信息，去相应数量的位重建原始的数值。

`encode()`函数: 遍历posting_list，对每个数字进行编码并将结果连接起来。
`decode()`函数: 逐个解码同时更新索引以跳过已解码的部分。

In [1]:
import struct
class ECCompressedPostings:
    #If you need any extra helper methods you can add them here 
    ### Begin your code
    @staticmethod
    def gamma_encode(num):
        if num == 0:
            return b'\x00'
        N = num.bit_length() - 1 
        M = num - (1 << N)
        unary_encoded = b'\x01' * N + b'\x00'
        num_bytes = (N + 7) // 8
        M_encoded = M.to_bytes(num_bytes, byteorder='big')
        if len(M_encoded) < num_bytes:
            M_encoded = b'\x00' * (num_bytes - len(M_encoded)) + M_encoded
        return unary_encoded + M_encoded[-num_bytes:]
    @staticmethod
    def gamma_decode(encoded_str):
        index = 0
        if encoded_str[index] == 0x00:
            return 0, 1
        while index < len(encoded_str) and encoded_str[index] == 0x01:
            index += 1
        N = index
        byte_length = (N + 7) // 8
        start_index = index + 1
        end_index = start_index + byte_length
        M_bytes = encoded_str[start_index:end_index]
        M = int.from_bytes(M_bytes, byteorder='big') & ((1 << N) - 1)
        return (1 << N) + M, end_index

    ### End your code

    @staticmethod
    def encode(postings_list):
        ### Begin your code
        if not postings_list:  # 特殊处理空列表
            return b''
        encoded = bytes()
        for num in postings_list:
            encoded += ECCompressedPostings.gamma_encode(num)
        return encoded
        ### End your code

        
    @staticmethod
    def decode(encoded_postings_list):
        ### Begin your code
        decoded_list = []
        index = 0
        while index < len(encoded_postings_list):
            num, length = ECCompressedPostings.gamma_decode(encoded_postings_list[index:])
            decoded_list.append(num)
            index += length
        return decoded_list
        ### End your code

# 总结

通过此次实验，更好的理解掌握了索引的构建与压缩以及布尔检索。在实验的过程中也遇到了挺多问题，比如变量太多时不时忘记,一些库函数的使用（像extend和sorted_intersect之类的）,分词时方法的使用（尝试了不同的分词方式）以及额外编码方式中gamma编码(因为本人在网上搜到不同的两个版本所以挺模糊的，最后查看ppt又结合网络成功理解，参考了（https://tonymazn.wordpress.com/2018/09/03/%E5%8E%8B%E7%BC%A9%E7%AE%97%E6%B3%95%E4%B9%8Belias-gamma-coding-elias-delta-coding/）其中还有delta编码但本次只实现了gamma编码)。