### 文件路径

In [1]:
from pathlib import Path

 
Path("cache").mkdir(exist_ok=True)

 
ARTICLE_DIR = "article"
TOKENIZED_PATH = "cache/tokenized_docs.json"
WORD_LIST_PATH = "cache/word_list.json"
FAISS_INDEX_PATH = "cache/faiss.index"
SQLITE_VECTOR_DB = "cache/word_vectors.db"


### 文章加载+分词（多线程处理+缓存）

In [2]:
import os, json, re, jieba 
from tqdm import tqdm   
import time

# 加载所有文档，文件名为ID
def load_documents(folder_path):
    docs = {}
    for filename in sorted(os.listdir(folder_path)):
        if filename.endswith(".txt"):
            doc_id = int(filename.replace(".txt", ""))
            with open(os.path.join(folder_path, filename), 'r', encoding='utf-8') as f:
                docs[doc_id] = f.read()
    return docs

# 分词 + 清洗
def jieba_cut_serial(text):
    words = list(jieba.cut(text))
    clean_words = []
    for word in words:
        word = word.strip()
        if len(word) <= 1:
            continue
        if word.isdigit():
            continue
        if re.match(r"^[^\w\u4e00-\u9fa5]+$", word):
            continue
        clean_words.append(word)
    return clean_words

# 主逻辑
def tokenize_serial(documents, cache_path=TOKENIZED_PATH):
    if os.path.exists(cache_path):
        with open(cache_path, "r", encoding='utf-8') as f:
            print("检测到缓存文件，正在加载分词结果...")
            return json.load(f)

    print("正在进行分词...")
    ids, texts = zip(*documents.items())
    tokenized = {}
    
    start_time = time.time()
    for doc_id, text in tqdm(zip(ids, texts), total=len(ids), desc="分词进度"):
        tokenized[doc_id] = jieba_cut_serial(text)
    elapsed = time.time() - start_time

    with open(cache_path, "w", encoding='utf-8') as f:
        json.dump(tokenized, f, ensure_ascii=False)

    print(f"分词完成，共{len(tokenized)}篇文档，耗时{elapsed:.2f}秒")
    return tokenized

documents = load_documents(ARTICLE_DIR) 
tokenized_docs = tokenize_serial(documents)


检测到缓存文件，正在加载分词结果...


### BERT模型加载

In [3]:
from transformers import BertTokenizer, BertModel
import torch 

class ChineseWordEncoder:
    def __init__(self, model_name="hfl/chinese-roberta-wwm-ext"):
        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        # 加载预训练模型并将其放到GPU上
        self.model = BertModel.from_pretrained(model_name).to("cuda")
        # 设置为评估模式
        self.model.eval()

    def get_vector(self, word):
        # 使用 tokenizer 对词语进行编码，生成 token id 形式的输入
        # truncation=True 表示对过长输入自动截断；max_length=10 是为了安全限制长度
        # return_tensors="pt" 会返回 PyTorch 张量形式的输入 
        inputs = self.tokenizer(word, return_tensors="pt", truncation=True, max_length=10).to("cuda")
        # 关闭梯度计算（加速 + 减少显存占用），执行前向传播，获取输出结果
        with torch.no_grad():
            # outputs 是一个包含 last_hidden_state 等多个输出的对象
            outputs = self.model(**inputs)
        # 从输出中提取 `last_hidden_state`，形状为 (batch_size=1, sequence_len, hidden_dim)
        # 我们选取第一个 token（CLS 标记）对应的向量表示，作为整体词语的语义表达
        cls_vector = outputs.last_hidden_state[:, 0, :]  # 取 batch 的第一个 token 的全部隐藏层向量

        # squeeze(0)：将 (1, 768) => (768,)
        # cpu().numpy()：将张量从 GPU 移至 CPU 并转为 NumPy 数组
        return cls_vector.squeeze(0).cpu().numpy()

encoder = ChineseWordEncoder()
print("BERT模型已加载到GPU")


  from .autonotebook import tqdm as notebook_tqdm


BERT模型已加载到GPU


### SQLite数据库有关方法

In [4]:
import sqlite3
import numpy as np

# 初始化 SQLite 数据库
def init_sqlite_vector_db(db_path=SQLITE_VECTOR_DB):
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute('''
        CREATE TABLE IF NOT EXISTS word_vectors (
            word TEXT PRIMARY KEY,
            vector BLOB
        )
    ''')
    conn.commit()
    conn.close()

# 写入词向量
def save_vector_to_sqlite(word, vector, db_path=SQLITE_VECTOR_DB):
    vec_bytes = vector.astype("float32").tobytes()
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("INSERT OR REPLACE INTO word_vectors (word, vector) VALUES (?, ?)", (word, vec_bytes))
    conn.commit()
    conn.close()

# 读取词向量
def load_vector_from_sqlite(word, db_path=SQLITE_VECTOR_DB):
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("SELECT vector FROM word_vectors WHERE word = ?", (word,))
    row = c.fetchone()
    conn.close()
    if row is None:
        return None
    return np.frombuffer(row[0], dtype="float32")


### 倒排索引 + 跳表交集 + SQLite + FAISS

In [5]:
import faiss 
from collections import defaultdict

class InvertedIndex:
    def __init__(self):
        self.index = defaultdict(list) 
        self.word_list = [] 
        self.faiss_index = None

    def build(self, tokenized_docs, encoder, db_path=SQLITE_VECTOR_DB):
        # 如果缓存文件都存在，直接加载
        if os.path.exists(WORD_LIST_PATH) and os.path.exists(FAISS_INDEX_PATH):
            with open(WORD_LIST_PATH, "r", encoding='utf-8') as f:
                self.word_list = json.load(f)
            self.faiss_index = faiss.read_index(FAISS_INDEX_PATH)
            print(f"已从缓存加载 word_list 和 faiss.index，词数：{len(self.word_list)}")
            
            # 同时仍需重建倒排索引（来自 tokenized_docs）
            for doc_id, tokens in tokenized_docs.items():
                for token in set(tokens):
                    self.index[token].append(int(doc_id))
            for token in self.index:
                self.index[token].sort()
            return  # 直接结束，避免后续冗余步骤
        
        # 初始化 SQLite 数据库
        init_sqlite_vector_db(db_path)

        # 1. 构建倒排索引 + 词表
        vocab = set()
        for doc_id, tokens in tokenized_docs.items():
            # set() 去重
            for token in set(tokens):
                self.index[token].append(int(doc_id))
                vocab.add(token)
        self.word_list = list(self.vocab)

        # 2. 向量构建并写入 SQLite，同时构建向量集合用于 FAISS
        vecs = []
        word_vectors = {}
        for word in self.word_list:
            vec = load_vector_from_sqlite(word, db_path)
            if vec is None:
                try:
                    vec = encoder.get_vector(word)
                    save_vector_to_sqlite(word, vec, db_path)
                except:
                    vec = np.zeros(768)
                    save_vector_to_sqlite(word, vec, db_path)
            word_vectors[word] = vec.tolist()
            vecs.append(vec)

        # 3. 构建 FAISS 索引
        vecs = np.array(vecs).astype("float32")
        self.faiss_index = faiss.IndexFlatL2(vecs.shape[1])
        self.faiss_index.add(vecs)

        # 4. 保存 word_list 和 FAISS 索引
        with open(WORD_LIST_PATH, "w", encoding='utf-8') as f:
            json.dump(self.word_list, f)
        faiss.write_index(self.faiss_index, FAISS_INDEX_PATH)

        # 5. 对倒排索引按文档排序
        for token in self.index:
            self.index[token].sort()

        print(f"倒排索引构建完成，共索引词数：{len(self.word_list)}，词向量缓存于SQLite。")
    
    
    def intersect_with_skip(self, list1, list2):
        if not list1 or not list2:
            return []

        result = []
        i, j = 0, 0
        len1, len2 = len(list1), len(list2)
        skip1 = int(len1 ** 0.5) or 1
        skip2 = int(len2 ** 0.5) or 1

        while i < len1 and j < len2:
            if list1[i] == list2[j]:
                result.append(list1[i])
                i += 1
                j += 1
            elif list1[i] < list2[j]:
                next_i = i + skip1
                if next_i < len1 and list1[next_i] <= list2[j]:
                    i = next_i
                else:
                    i += 1
            else:
                next_j = j + skip2
                if next_j < len2 and list2[next_j] <= list1[i]:
                    j = next_j
                else:
                    j += 1

        return result
    
    # Top-K相似词查询函数
    def top_k_similar(self, query_vec, query_word=None, k=2):
        query_vec = np.array([query_vec]).astype("float32")
        distances, indices = self.faiss_index.search(query_vec, k + 1)
        candidates = []
        for i, d in zip(indices[0], distances[0]):
            word = self.word_list[i]
            if word == query_word:
                continue
            candidates.append((word, d))
            if len(candidates) == k:
                break
        return candidates

    # 搜索+TOP K查询函数
    def search(self, word, encoder, topk=2):
        start_time = time.time()
        docs_word = self.index.get(word, [])
        try:
            word_vec = encoder.get_vector(word)
        except:
            return [], [], [], 0.0

        top_similar = self.top_k_similar(word_vec, word, k=topk)
        docs_similars = [self.index.get(sim_word, []) for sim_word, _ in top_similar]
        if len(docs_similars) < 2:
            return [], docs_word, top_similar, time.time() - start_time

        all_lists = [docs_word] + docs_similars
        all_lists = sorted(all_lists, key=len)
        temp = self.intersect_with_skip(all_lists[0], all_lists[1])
        final_result = self.intersect_with_skip(temp, all_lists[2])
        elapsed = time.time() - start_time

        print(f"\n=== 🔎查询关键词：{word} ===")
        print(f"查询词包含文档数: {len(docs_word)}，示例:{docs_word[:10]}")
        print(f"Top{topk}相似词:")
        for i, (sim_word, dist) in enumerate(top_similar, 1):
            sim_docs = self.index.get(sim_word, [])
            print(f" {i}. {sim_word} (L2距离{dist:.2f})，出现于{len(sim_docs)}篇文档，示例:{sim_docs[:10]}")
        print(f"交集文档数: {len(final_result)}，示例:{final_result[:10]}")
        print(f"查询耗时: {elapsed:.3f}秒")

        return final_result, docs_word, top_similar, elapsed



### 测试样例

In [6]:
index = InvertedIndex()
index.build(tokenized_docs, encoder)

query_words = ["互联网", "经济", "美国", "消费", "军队"]
results = {}

for query in query_words:
    result_docs, base_docs, top2, used_time = index.search(query, encoder)
    results[query] = result_docs


已从缓存加载 word_list 和 faiss.index，词数：88370

=== 🔎查询关键词：互联网 ===
查询词包含文档数: 439，示例:[8, 93, 106, 122, 126, 163, 180, 221, 235, 251]
Top2相似词:
 1. 互联网络 (L2距离28.31)，出现于2篇文档，示例:[15433, 19630]
 2. 互联网站 (L2距离28.79)，出现于1篇文档，示例:[9514]
交集文档数: 0，示例:[]
查询耗时: 0.573秒

=== 🔎查询关键词：经济 ===
查询词包含文档数: 3040，示例:[2, 5, 13, 28, 29, 30, 32, 44, 47, 65]
Top2相似词:
 1. 非经济 (L2距离40.02)，出现于1篇文档，示例:[17892]
 2. 经济账 (L2距离40.52)，出现于2篇文档，示例:[10483, 18906]
交集文档数: 0，示例:[]
查询耗时: 0.033秒

=== 🔎查询关键词：美国 ===
查询词包含文档数: 1752，示例:[28, 39, 62, 67, 77, 110, 126, 136, 140, 152]
Top2相似词:
 1. 美等国 (L2距离36.43)，出现于1篇文档，示例:[29]
 2. 美两国 (L2距离39.21)，出现于4篇文档，示例:[4373, 16508, 18173, 18192]
交集文档数: 0，示例:[]
查询耗时: 0.035秒

=== 🔎查询关键词：消费 ===
查询词包含文档数: 265，示例:[90, 140, 250, 311, 381, 395, 415, 513, 538, 547]
Top2相似词:
 1. 消费观 (L2距离43.97)，出现于1篇文档，示例:[10196]
 2. 消费量 (L2距离44.27)，出现于8篇文档，示例:[3433, 4317, 10131, 10196, 11781, 11782, 17302, 18685]
交集文档数: 0，示例:[]
查询耗时: 0.035秒

=== 🔎查询关键词：军队 ===
查询词包含文档数: 396，示例:[2, 5, 91, 106, 144, 156, 178, 308, 453, 528]
Top2相似词:


### 功能扩展 复杂布尔表达式查询

In [7]:
# 优先级设定
PRECEDENCE = {'NOT': 3, 'AND': 2, 'OR': 1}

def tokenize(expr):
    # 括号前后加空格，方便分隔
    expr = expr.replace('(', ' ( ').replace(')', ' ) ')
    return expr.strip().split()

def infix_to_postfix(tokens):
    output = []
    stack = []
    for token in tokens:
        if token == '(':
            stack.append(token)
        elif token == ')':
            while stack and stack[-1] != '(':
                output.append(stack.pop())
            stack.pop()  # 弹出 (
        elif token.upper() in PRECEDENCE:
            while (stack and stack[-1] != '(' and
                   PRECEDENCE.get(stack[-1].upper(), 0) >= PRECEDENCE[token.upper()]):
                output.append(stack.pop())
            stack.append(token.upper())
        else:
            output.append(token)
    while stack:
        output.append(stack.pop())
    return output

def eval_postfix(postfix_tokens, index, all_ids):
    stack = []
    for token in postfix_tokens:
        token = token.upper()
        if token == 'NOT':
            if not stack:
                raise ValueError("栈为空，NOT 缺少操作数")
            operand = stack.pop()
            stack.append(all_ids - operand)
        elif token in ('AND', 'OR'):
            b = stack.pop()
            a = stack.pop()
            result = a & b if token == 'AND' else a | b
            stack.append(result)
        else:
            stack.append(set(index.index.get(token, [])))
    if len(stack) != 1:
        raise ValueError("后缀表达式求值错误！检查表达式")
    return stack[0]


In [8]:
exprs = ["NOT 经济 AND (互联网 AND 消费) " ,
         "经济 AND 互联网 AND 消费",
         "互联网 AND 消费"
]
# 构造全集
all_ids = set()
for docs in index.index.values():
    all_ids.update(docs)

for expr in exprs:
    print("输入表达式:", expr)
    tokens = tokenize(expr)
    postfix = infix_to_postfix(tokens)
    print("后缀表达式:", postfix) 
    result = eval_postfix(postfix, index, all_ids)
    print("匹配文档数:", len(result))
    print("文档ID样例:", list(result)[:10])


输入表达式: NOT 经济 AND (互联网 AND 消费) 
后缀表达式: ['经济', 'NOT', '互联网', '消费', 'AND', 'AND']
匹配文档数: 4
文档ID样例: [19658, 18827, 15702, 18847]
输入表达式: 经济 AND 互联网 AND 消费
后缀表达式: ['经济', '互联网', 'AND', '消费', 'AND']
匹配文档数: 21
文档ID样例: [18179, 1672, 18700, 6960, 14388, 5430, 15418, 16700, 3260, 1472]
输入表达式: 互联网 AND 消费
后缀表达式: ['互联网', '消费', 'AND']
匹配文档数: 25
文档ID样例: [18179, 1672, 18827, 18700, 18847, 6960, 14388, 5430, 15418, 16700]
