## 12 | 面向对象（下）：如何实现一个搜索引擎？

### “高大上”的搜索引擎

一个搜索引擎由搜索器、索引器、检索器和用户接口四个部分组成。

- 搜索器，通俗来讲就是我们常提到的爬虫（scrawler），它能在互联网上大量爬取各类网站的内容，送给索引器。
- 索引器拿到网页和内容后，会对内容进行处理，形成索引（index），存储于内部的数据库等待检索。
- 最后的用户接口很好理解，是指网页和 App 前端界面，例如百度和谷歌的搜索页面。用户通过用户接口，向搜索引擎发出询问（query），询问解析后送达检索器；
- 检索器高效检索后，再将结果返回给用户。

我们先来定义 SearchEngineBase 基类。

In [10]:
class SearchEngineBase(object):
    def __init__(self):
        pass

    def add_corpus(self, file_path):
        with open(file_path, 'r') as fin:
            text = fin.read()
        self.process_corpus(file_path, text)

    def process_corpus(self, id, text):
        raise Exception('process_corpus not implemented.')

    def search(self, query):
        raise Exception('search not implemented.')

def main(search_engine):
    for file_path in ['1.txt', '2.txt', '3.txt', '4.txt', '5.txt']:
        search_engine.add_corpus('./input/'+file_path)

    while True:
        query = input()
        if query == 'exit':
            break
            
        results = search_engine.search(query)
        print('found {} result(s):'.format(len(results)))
        for result in results:
            print(result)

SearchEngineBase 可以被继承，继承的类分别代表不同的算法引擎。
- 每一个引擎都应该实现 process_corpus() 和 search() 两个函数，对应我们刚刚提到的索引器和检索器。
- main() 函数提供搜索器和用户接口，于是一个简单的包装界面就有了。

具体来看这段代码，其中，
- add_corpus() 函数负责读取文件内容，将文件路径作为 ID，连同内容一起送到 process_corpus 中。
- process_corpus 需要对内容进行处理，然后文件路径为 ID ，将处理后的内容存下来。处理后的内容，就叫做索引（index）。
- search 则给定一个询问，处理询问，再通过索引检索，然后返回。

In [None]:
class SimpleEngine(SearchEngineBase):
    def __init__(self):
        super(SimpleEngine, self).__init__()
        self.__id_to_texts = {}

    def process_corpus(self, id, text):
        self.__id_to_texts[id] = text

    def search(self, query):
        results = []
        for id, text in self.__id_to_texts.items():
            if query in text:
                results.append(id)
        return results

search_engine = SimpleEngine()
main(search_engine)


# ########## 输出 ##########


# simple
# found 0 result(s):
# little
# found 2 result(s):
# 1.txt
# 2.txt

- SimpleEngine 实现了一个继承 SearchEngineBase 的子类，继承并实现了 process_corpus 和 search 接口，同时，也顺手继承了 add_corpus 函数（当然你想重写也是可行的），因此我们可以在 main() 函数中直接调取。
- 在我们新的构造函数中，self.__id_to_texts = {} 初始化了自己的私有变量，也就是这个用来存储文件名到文件内容的字典。process_corpus() 函数则非常直白地将文件内容插入到字典中。这里注意，ID 需要是唯一的，不然相同 ID 的新内容会覆盖掉旧的内容。
- search 直接枚举字典，从中找到要搜索的字符串。如果能够找到，则将 ID 放到结果列表中，最后返回。


这种实现方式简单，但显然是一种很低效的方式：
- 每次索引后需要占用大量空间，因为索引函数并没有做任何事情；【<font color=blue>self.__id_to_texts的value是整个文档的text</font>】
- 每次检索需要占用大量时间，因为所有索引库的文件都要被重新搜索一遍。如果把语料的信息量视为 n，那么这里的时间复杂度和空间复杂度都应该是 O(n) 级别的。
- 这里的 query 只能是一个词，或者是连起来的几个词。如果你想要搜索多个词，它们又分散在文章的不同位置，我们的简单引擎就无能为力了。

解决方案：
最直接的一个想法，就是把语料分词，看成一个个的词汇，这样就只需要对每篇文章存储它所有词汇的 set 即可。

## Bag of words 和 inverted index 

In [13]:
import re

class BOWEngine(SearchEngineBase):
    def __init__(self):
        super(BOWEngine, self).__init__()
        self.__id_to_words = {}

    def process_corpus(self, id, text):
        self.__id_to_words[id] = self.parse_text_to_words(text)

    def search(self, query):
        query_words = self.parse_text_to_words(query)
        results = []
        for id, words in self.__id_to_words.items():
            if self.query_match(query_words, words):
                results.append(id)
        return results
    
    @staticmethod
    def query_match(query_words, words):
        for query_word in query_words:
            if query_word not in words:
                return False
        return True

    @staticmethod
    def parse_text_to_words(text):
        # 使用正则表达式去除标点符号和换行符
        text = re.sub(r'[^\w ]', ' ', text)
        # 转为小写
        text = text.lower()
        # 生成所有单词的列表
        word_list = text.split(' ')
        # 去除空白单词
        word_list = filter(None, word_list)
        # 返回单词的 set
        return set(word_list)

search_engine = BOWEngine()
main(search_engine)


########## 输出 ##########


# i have a dream
# found 3 result(s):
# 1.txt
# 2.txt
# 3.txt
# freedom children
# found 1 result(s):
# 5.txt

i have a dream
found 3 result(s):
./input/1.txt
./input/2.txt
./input/3.txt
freedom children
found 1 result(s):
./input/5.txt
exit


我们先来理解一个概念，BOW Model

1. 假设一个文本，不考虑语法、句法、段落，也不考虑词汇出现的顺序，只将这个文本看成这些词汇的集合。于是相应的，我们把 id_to_texts 替换成 id_to_words

2. process_corpus() 函数调用类静态函数 parse_text_to_words，将文章打碎形成词袋，放入 set 之后再放到字典中

3. search() 函数则稍微复杂一些。这里我们假设，想得到的结果，是所有的搜索关键词都要出现在同一篇文章中我们需要同样打碎 query 得到一个 set，然后把 set 中的每一个词，和我们的索引中每一篇文章进行核对，看一下要找的词是否在其中。而这个过程由静态函数 query_match 负责。

注意：parse_text_to_words和query_match这两个函数都是没有状态的，**它们不涉及对象的私有变量**（没有 self 作为参数），相同的输入能够得到完全相同的输出结果。因此设置为静态，可以方便其他的类来使用。


缺陷：
1. 可是，即使这样做，每次查询时依然需要遍历所有 ID，虽然比起 Simple 模型已经节约了大量时间，但是互联网上有上亿个页面，每次都全部遍历的代价还是太大了
2. 再有，词袋模型并不考虑单词间的顺序，但有些人希望单词按顺序出现，或者希望搜索的单词在文中离得近一些，这种情况下词袋模型现任就无能为力了。


In [None]:

import re

class BOWInvertedIndexEngine(SearchEngineBase):
    def __init__(self):
        super(BOWInvertedIndexEngine, self).__init__()
        self.inverted_index = {}

    def process_corpus(self, id, text):
        words = self.parse_text_to_words(text)
        for word in words:
            if word not in self.inverted_index:
                self.inverted_index[word] = []
            self.inverted_index[word].append(id)

    def search(self, query):
        query_words = list(self.parse_text_to_words(query))
        query_words_index = list()
        for query_word in query_words:
            query_words_index.append(0)
        
        # 如果某一个查询单词的倒序索引为空，我们就立刻返回
        for query_word in query_words:
            if query_word not in self.inverted_index:
                return []
        
        result = []
        while True:
            
            # 首先，获得当前状态下所有倒序索引的 index
            current_ids = []
            
            for idx, query_word in enumerate(query_words):
                current_index = query_words_index[idx]
                current_inverted_list = self.inverted_index[query_word]
                
                # 已经遍历到了某一个倒序索引的末尾，结束 search
                if current_index >= len(current_inverted_list):
                    return result

                current_ids.append(current_inverted_list[current_index])

            # 然后，如果 current_ids 的所有元素都一样，那么表明这个单词在这个元素对应的文档中都出现了
            if all(x == current_ids[0] for x in current_ids):
                result.append(current_ids[0])
                query_words_index = [x + 1 for x in query_words_index]
                continue
            
            # 如果不是，我们就把最小的元素加一
            min_val = min(current_ids)
            min_val_pos = current_ids.index(min_val)
            query_words_index[min_val_pos] += 1

    @staticmethod
    def parse_text_to_words(text):
        # 使用正则表达式去除标点符号和换行符
        text = re.sub(r'[^\w ]', ' ', text)
        # 转为小写
        text = text.lower()
        # 生成所有单词的列表
        word_list = text.split(' ')
        # 去除空白单词
        word_list = filter(None, word_list)
        # 返回单词的 set
        return set(word_list)

search_engine = BOWInvertedIndexEngine()
main(search_engine)


########## 输出 ##########


# little
# found 2 result(s):
# 1.txt
# 2.txt
# little vicious
# found 1 result(s):
# 2.txt

新模型继续使用之前的接口，仍然只在 __init__()、process_corpus()和search()三个函数进行修改。

<font color=red>这其实也是大公司里团队协作的一种方式，在合理的分层设计后，每一层的逻辑只需要处理好分内的事情即可。在迭代升级我们的搜索引擎内核时， main 函数、用户接口没有任何改变。当然，如果公司招了新的前端工程师，要对用户接口部分进行修改，新人也不需要过分担心后台的事情，只要做好数据交互就可以了。</font>

1. 针对第一个问题，我们采用倒序索引
倒序索引，一如其名，也就是说这次反过来，我们保留的是 word -> id 的字典。于是情况就豁然开朗了，在 search 时，我们只需要把想要的 query_word 的几个倒序索引单独拎出来，然后从这几个列表中找共有的元素，那些共有的元素，即 ID，就是我们想要的查询结果。这样，我们就避免了将所有的 index 过一遍的尴尬。

process_corpus 建立倒序索引。注意，这里的代码都是非常精简的。在工业界领域，需要一个 unique ID 生成器，来对每一篇文章标记上不同的 ID，倒序索引也应该按照这个 unique_id 来进行排序。

至于 search() 函数，你大概了解它做的事情即可。它会根据 query_words 拿到所有的倒序索引，如果拿不到，就表示有的 query word 不存在于任何文章中，直接返回空；拿到之后，运行一个“合并 K 个有序数组”的算法，从中拿到我们想要的 ID，并返回。