In [1]:
import os
import Spider

def highlight(text: str, keyword: str):
    idx = text.lower().find(keyword.lower())
    result = text
    if idx >= 0:
        ori_word = text[idx:idx+(len(keyword))]
        result = text.replace(ori_word, f'*{ori_word}*')
    return result

def score(item, keyword: str):
    title_score = item[1].lower().count(keyword.lower())
    content_score = item[2].lower().count(keyword.lower())
    score_result = title_score * 5 + content_score * 3
    return score_result

In [2]:
class MySearcherC5V1:
    """加入缓存的版本"""
    def __init__(self, scale: int=1):
        self.docs = list()
        self.load_data()
        if scale > 1:
            self.docs *= scale  # 文档规模倍增，用于测试搜索速度
        self.cache = dict()

    def load_data(self, data_file_name='./news_list.pkl'):
        if os.path.exists(data_file_name):
            self.docs = Spider.pickle_load(data_file_name)
        else:
            Spider.pickle_save(data_file_name)
            self.docs = Spider.pickle_load(data_file_name)

    def search(self, keyword):
        keyword = keyword.lower()
        if keyword not in self.cache:
            count = 0
            sorted_result = list()
            for item in self.docs:
                if keyword in (item[1] + item[2]).lower():
                    sorted_result.append([count, score(item, keyword), item[1]])
                count += 1
            sorted_result.sort(key=lambda x: x[1], reverse=True)
            self.cache[keyword] = sorted_result
        else:
            sorted_result = self.cache[keyword]
        return sorted_result

    def render_search_result(self, keyword):
        count = 0
        for item in self.search(keyword):
            count += 1
            print(f'{count}[{item[1]}] {highlight(self.docs[item[0]][1], keyword)}')

In [3]:
class MySearcherC5V2(MySearcherC5V1):
    """加入预热功能的版本"""
    def __init__(self, scale=1):
        super().__init__(scale)
        self.vocab = {'华为', '手机', 'tiktok'}   # 手动建立一个预热关键词集合
        self.build_cache()

    def build_cache(self):
        """在用户使用之前，提前搜索一遍vocab里面的内容，进行预热"""
        for word in self.vocab:
            self.search(word)

In [4]:
%%time
searcher_1x_v1 = MySearcherC5V1()
searcher_10x_v1 = MySearcherC5V1(scale=10)
searcher_100x_v1 = MySearcherC5V1(scale=100)
searcher_1000x_v1 = MySearcherC5V1(scale=1000)

Wall time: 412 ms


In [5]:
%%time
searcher_1x_v2 = MySearcherC5V2(scale=1)
searcher_10x_v2 = MySearcherC5V2(scale=10)
searcher_100x_v2 = MySearcherC5V2(scale=100)
searcher_1000x_v2 = MySearcherC5V2(scale=1000)

Wall time: 14 s


In [6]:
# 搜索时间随数据量增加而变化情况
%time r = searcher_1x_v1.search('华为')
%time r = searcher_10x_v1.search('华为')
%time r = searcher_100x_v1.search('华为')
%time r = searcher_1000x_v1.search('华为')

%time r = searcher_10x_v1.search('苹果')

Wall time: 3.99 ms
Wall time: 33.9 ms
Wall time: 363 ms
Wall time: 3.87 s
Wall time: 41.9 ms


In [7]:
# 搜素时间随搜索次数增加而变化情况
%time for i in range(10):r = searcher_10x_v1.search('华为')
%time for i in range(100):r = searcher_10x_v1.search('华为')
# 加入缓存功能后，重复查询耗费时间将极大缩短

Wall time: 0 ns
Wall time: 0 ns


In [8]:
import jieba

class MySearcherC5V3(MySearcherC5V2):
    """对文档进行分词后作为预热关键词"""
    def build_cache(self):
        """用分词（用文档过滤词库）的方式初始化缓存"""
        for doc in self.docs:
            for word in jieba.cut(doc[1]+' '+doc[2], cut_all=True):
                self.search(word)

In [9]:
%%time
searcher_1x_v3 = MySearcherC5V3()


Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\10633\AppData\Local\Temp\jieba.cache
Loading model cost 0.605 seconds.
Prefix dict has been built successfully.


Wall time: 1min 29s
