## procedure embedding

In [None]:
# import os
# from openai import OpenAI
# import json

# def get_vectors(data):
#     client = OpenAI(
#     api_key='',  
#     base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" 
#     )
#     completion = client.embeddings.create(
#     model="text-embedding-v3",
#     input=data,
#     dimensions=1024,
#     encoding_format="float"
#     )
#     resp = completion.model_dump_json() 
#     dic = json.loads(resp)
#     embeddings = [x['embedding'] for x in dic['data']]
#     return embeddings


# with open("./procedure/procedure.json", "r", encoding='utf-8') as f:
#     data = json.load(f)

# titles = [x['title'] for x in data]

# embeddings = get_vectors(titles[0:10]) 
# embeddings = embeddings + get_vectors(titles[10:])
# assert len(embeddings) == len(data)
# for (i, x) in enumerate(data):
#     x['embedding'] = embeddings[i]

# with open("./procedure/procedure.json", "w", encoding='utf-8') as f:
#     json.dump(data, f, ensure_ascii=False, indent=4)

In [3]:
import json
import os

with open('./procedure/procedure.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

for i, item in enumerate(data):
    if item.get('img'):
        # 将 ./procedure/item.get('img').png 重命名为 i.png
         os.rename(f'./procedure/{item.get("img")}', f'./procedure/{i}.png')


In [None]:
# -*- coding = utf-8 -*-
# @Time : 2025/1/4 14:52
# @Author : lx
# @File : demo2.py
# @Software : PyCharm
import os
from langchain_chroma import Chroma
from langchain_community.embeddings import ZhipuAIEmbeddings
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import ChatOpenAI
import json

# 初始化模型
model = ChatOpenAI(
    model="glm-4-plus",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
    streaming=True 
)

os.environ['ZHIPUAI_API_KEY'] = ''

# 加载数据
with open("./procedure/procedure.json", "r", encoding='utf-8') as f:
    procedure_data = json.load(f)

document = [
    Document(
        page_content=x['title'],
        metadata={"idx": idx}
    ) for idx, x in enumerate(procedure_data)
]

# 初始化嵌入模型
embedding = ZhipuAIEmbeddings(model='embedding-3')

# 检查是否已有存储的向量数据库
persist_dir = "./chroma_db"
if os.path.exists(persist_dir) and os.listdir(persist_dir):
    # 从磁盘加载
    print("Loading existing vector store...")
    vector_store = Chroma(
        persist_directory=persist_dir,
        embedding_function=embedding
    )
else:
    # 创建新的并存储到磁盘
    print("Creating new vector store...")
    vector_store = Chroma.from_documents(
        documents=document,
        embedding=embedding,
        persist_directory=persist_dir
    )

# 定义检索器
retriever = RunnableLambda(vector_store.similarity_search).bind(k=1)

# 定义 prompt
message = """
使用提供的上下文进行回答。若无法从上下文中找到答案，请回答“我不知道”。
{question}
上下文：{content}
"""
prompt_template = ChatPromptTemplate.from_messages([('human', message)])

# 定义 chain
def format_response(data):
    """处理检索结果和用户问题"""
    retrieved_docs = data["retrieved_docs"]
    question = data["question"]
    
    if not retrieved_docs:
        return {"question": question, "content": "未找到相关信息"}
    
    doc = retrieved_docs[0]  # 获取第一个匹配的文档
    print(doc)
    idx = doc.metadata['idx']  # 获取 metadata 中的 idx
    text = procedure_data[idx]['text']  # 从 procedure_data 获取对应的 text
    #print(f'question: {question} content: {text}')
    return {"question": question, "content": text}

# 构建 chain
chain = (
    {"retrieved_docs": retriever, "question": RunnablePassthrough()} 
    | RunnableLambda(format_response) 
    | prompt_template 
    | model
)

# 测试查询
for chunk in chain.stream("如何打印成绩单"):
    print(chunk.content, end="", flush=True)  # 逐块打印，不换行

Loading existing vector store...
page_content='成绩单、学籍证明打印办理流程' metadata={'idx': 12}
根据提供的上下文，打印成绩单的方法如下：

1. **在校生**：
   - 可以在自助打印机上打印成绩单。
   - 也可以到本科生院学籍科（明远楼203室）凭有效身份证件办理。

2. **毕业生**：
   - 可以联系云南大学档案馆（0871-65031310）查询成绩单。
   - 如需在成绩单上加盖本科生院成绩专用章，可以将成绩单邮寄至云南大学本科生院学籍科（呈贡校区明远楼203室），并留下回寄地址，学籍科盖章后以顺丰到付件寄回。
   - 2004届（含）及以后的毕业生，还可以将本人身份证（正反面）、毕业证（或结业证）、学位证（如未获得学位可不提供）的扫描件发邮件至1761753176@qq.com，邮件正文注明需要成绩单的份数，并写明邮寄地址及联系方式，学籍科核查无误后打印学生成绩单，加盖本科生院成绩专用章以顺丰到付件寄给学生。

3. **注意事项**：
   - 本科生院不提供任何形式的电子版成绩单。
   - 毕业生（或结业生）已无学籍，不能开具学籍证明。

4. **咨询方式**：
   - 如有疑问，可拨打本科生院学籍科电话咨询：0871-65032602。

请根据你的具体身份（在校生或毕业生）选择相应的方法进行操作。

## intention dataset generate

In [9]:
import pandas as pd
import random
from itertools import product
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 同义词词典
SYNONYMS = {
    "流程": ["步骤", "手续", "程序", "过程"],
    "办理": ["申请", "处理", "操作", "进行"],
    "如何": ["怎样", "怎么", "请问如何", "如何操作"],
    "信息": ["资料", "详情", "详细介绍", "基本情况"],
    "查询": ["查找", "搜索", "了解", "获取"]
}

# 数据增强函数
def augment_text(text):
    words = text.split()
    # 随机替换同义词
    for i in range(len(words)):
        if words[i] in SYNONYMS:
            words[i] = random.choice(SYNONYMS[words[i]])
    # 随机删除词（20%概率）
    if len(words) > 3 and random.random() < 0.2:
        del words[random.randint(0, len(words)-1)]
    # 随机插入词（20%概率）
    if random.random() < 0.2:
        words.insert(random.randint(0, len(words)), "的")
    return "".join(words)

# 扩展后的模板
PROCESS_TEMPLATES = [
    "如何办理{}？", "{}的流程是什么？", "怎样申请{}？", "请问{}需要什么步骤？",
    "{}的办理流程是怎样的？", "办理{}的具体步骤有哪些？", "我需要了解{}的流程", 
    "申请{}的步骤是怎样的？", "关于{}的详细流程", "怎么操作{}？",
    "{}应该怎么办理？", "有没有{}的操作指南？", "求{}的完整流程说明"
]

ENTITY_TEMPLATES = [
    "{}的信息是什么？", "查找关于{}的资料", "{}的相关信息", "查询{}的详细信息",
    "{}属于哪个学院？", "{}的具体位置在哪里？", "{}的联系方式是什么？",
    "如何联系{}？", "{}的课程安排是怎样的？", "{}的教师有哪些？",
    "{}的开放时间是？", "{}的负责部门是哪个？", "关于{}的最新通知"
]

# 数据集生成
def generate_enhanced_dataset():
    # 流程类问题生成（10倍扩展）
    procedures = [p.split("流程")[0].strip() for p in open("./procedure/procedure_list.txt").read().splitlines()]
    
    process_questions = []
    for p in procedures:
        for t in PROCESS_TEMPLATES:
            base_question = t.format(p)
            # 生成原始问题
            process_questions.append(base_question)
            # 生成增强问题（3个变体）
            for _ in range(3):
                process_questions.append(augment_text(base_question))
    
    # 实体类问题生成（10倍扩展）
    entities = [
        ("文学院历史与档案学院", "学院"),
        ("格物楼1栋1103", "教室"),
        ("C语言程序设计", "课程"),
        ("张学杰", "教师"),
        ("云南大学", "学校"),
        ("文渊楼211", "教室"),
        ("云南大学呈贡校区图书馆", "图书馆"),
        ("外国语学院", "学院"),
        ("法学院", "学院"),
        ("王津", "教师"),
        ("高等数学", "课程"),
        ("信息学院", "学院"),
        ("软件学院", "学院"),
        ("格物楼", "教学楼")
    ]
    
    entity_questions = []
    for e, _ in entities:
        for t in ENTITY_TEMPLATES:
            base_question = t.format(e)
            # 生成原始问题
            entity_questions.append(base_question)
            # 生成增强问题（3个变体）
            for _ in range(3):
                entity_questions.append(augment_text(base_question))
    
    # 创建DataFrame（添加负样本）
    data = {
        "text": process_questions + entity_questions,
        "label": [0]*len(process_questions) + [1]*len(entity_questions)
    }
    
    # 添加5%的负样本
    negative_samples = [
        "今天的天气怎么样？",
        "云南大学的历史有多久？",
        "校长办公室的电话是多少？",
        "图书馆今天的开馆时间",
        "最近的食堂在哪里？"
    ]
    data["text"].extend(negative_samples)
    data["label"].extend([2]*len(negative_samples))
    
    return pd.DataFrame(data).sample(frac=1).reset_index(drop=True)

enhanced_df = generate_enhanced_dataset()
enhanced_df.to_csv("./Intention_dataset.csv", index=False)


## intention model

In [15]:
import pandas as pd

df = pd.read_csv("./intention_dataset.csv")

with open("./intention_fasttext.txt", "w", encoding="utf-8") as f:
    for _, row in df.iterrows():
        label = f"__label__{row['label']}"
        line = f"{label} {row['text'].strip()}\n"
        f.write(line)


In [6]:
import pandas as pd
from sklearn.model_selection import train_test_split
import fasttext
from sklearn.metrics import classification_report

# 1. 数据预处理
def prepare_fasttext_data():
    df = pd.read_csv("intention_dataset.csv")
    
    # 统计类别分布
    print("原始类别分布:\n", df.label.value_counts())
    
    # 转换格式
    df['text'] = df['text'].str.replace(r'\s+', ' ')  # 清理空格
    df['fasttext_label'] = '__label__' + df['label'].astype(str)
    
    # 划分训练测试集
    train, test = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)
    
    # 保存为FastText格式
    train[['fasttext_label', 'text']].to_csv(
        'intention_fasttext.train', 
        index=False, 
        sep=' ', 
        header=None,
        quoting=3,  # 避免引号问题
        escapechar=' '
    )
    
    test[['fasttext_label', 'text']].to_csv(
        'intention_fasttext.valid',
        index=False,
        sep=' ',
        header=None,
        quoting=3,
        escapechar=' '
    )

prepare_fasttext_data()

# 2. 模型训练（优化参数）
model = fasttext.train_supervised(
    input="intention_fasttext.train",
    epoch=200,         # 增加训练轮次
    lr=0.1,            # 降低学习率
    wordNgrams=3,      # 增加n-gram窗口
    dim=200,           # 增加词向量维度
    loss='ova',        # 更适合二分类
    thread=4,          # 多线程加速
    verbose=2        # 显示训练细节
)

# 3. 模型评估
def evaluate_model():
    # 在验证集上测试
    print("\n验证集评估:")
    print(model.test("intention_fasttext.valid"))
    
    # 详细分类报告
    y_true, y_pred = [], []
    with open("intention_fasttext.valid") as f:
        for line in f:
            true_label = int(line.split()[0].split('__')[-1])
            text = ' '.join(line.strip().split()[1:])
            pred_label = int(model.predict(text)[0][0].split('__')[-1])
            
            y_true.append(true_label)
            y_pred.append(pred_label)
    
    print("\n详细分类报告:")
    print(classification_report(y_true, y_pred))

evaluate_model()

# 保存优化后的模型
model.save_model("optimized_intent_model.ftz")

原始类别分布:
 label
0    936
1    728
Name: count, dtype: int64

验证集评估:
(333, 0.9429429429429429, 0.9429429429429429)

详细分类报告:
              precision    recall  f1-score   support

           0       0.91      1.00      0.95       187
           1       1.00      0.87      0.93       146

    accuracy                           0.94       333
   macro avg       0.95      0.93      0.94       333
weighted avg       0.95      0.94      0.94       333



In [7]:
import fasttext

model = fasttext.load_model("intent_model.ftz")

def predict_intent(text):
    label, prob = model.predict(text)
    # 输出: label = ['__label__0'] or ['__label__1']
    return int(label[0].replace("__label__", "")), prob[0]

# 示例
test_questions = [
    "如何补办学生证？",
    "文渊楼211的教室信息",
    "申请缓考的流程是什么？",
    "C语言程序设计的上课地点在哪？"
]
for text in test_questions:
    predicted_label, confidence = predict_intent(text)
    print(f"问题：{text} 预测标签: {predicted_label}, 置信度: {confidence}")


问题：如何补办学生证？ 预测标签: 0, 置信度: 0.8596604466438293
问题：文渊楼211的教室信息 预测标签: 0, 置信度: 0.8520693182945251
问题：申请缓考的流程是什么？ 预测标签: 0, 置信度: 0.8602732419967651
问题：C语言程序设计的上课地点在哪？ 预测标签: 0, 置信度: 0.8630637526512146


In [23]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
import joblib

# 加载数据
df = pd.read_csv("intention_dataset.csv")

# 划分训练集测试集
X_train, X_test, y_train, y_test = train_test_split(df['text'], df['label'], test_size=0.2, random_state=42)

# 构建模型管道
pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(ngram_range=(1,2))),
    ('clf', LogisticRegression(max_iter=1000))
])

# 训练模型
pipeline.fit(X_train, y_train)

# 测试模型
y_pred = pipeline.predict(X_test)
print(classification_report(y_test, y_pred))

# 保存模型
joblib.dump(pipeline, "intent_model_sklearn.pkl")


              precision    recall  f1-score   support

           0       0.94      1.00      0.97       186
           1       1.00      0.93      0.96       147

    accuracy                           0.97       333
   macro avg       0.97      0.96      0.97       333
weighted avg       0.97      0.97      0.97       333



['intent_model_sklearn.pkl']

In [25]:
import joblib

model = joblib.load("intent_model_sklearn.pkl")

def predict_intent(text):
    pred = model.predict([text])[0]
    prob = model.predict_proba([text]).max()
    return int(pred), prob

# 示例
test_questions = [
    "如何补办学生证？",
    "文渊楼211的教室信息",
    "申请缓考的流程是什么？",
    "C语言程序设计的上课地点在哪？"
]
for text in test_questions:
    predicted_label, confidence = predict_intent(text)
    print(f"问题：{text} 预测类别: {label}, 置信度: {confidence}")


问题：如何补办学生证？ 预测类别: 0, 置信度: 0.5570527163908637
问题：文渊楼211的教室信息 预测类别: 0, 置信度: 0.5570527163908637
问题：申请缓考的流程是什么？ 预测类别: 0, 置信度: 0.5570527163908637
问题：C语言程序设计的上课地点在哪？ 预测类别: 0, 置信度: 0.5570527163908637


## transformer

In [None]:
import re
import jieba
from typing import List, Tuple, Dict
from Levenshtein import distance as levenshtein_distance
from sentence_transformers import SentenceTransformer
from neo4j import GraphDatabase
import numpy as np

# -------------------- 配置部分 --------------------
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "neo4j123456"

VECTOR_DB_PROCESSES = [
    "补办学生证流程", "缓考申请流程", "成绩单打印流程",  # 完整列表参考你的流程数据
    # ... 其他流程名称
]

# -------------------- 工具类 --------------------
class Neo4jConnector:
    def __init__(self):
        self.driver = GraphDatabase.driver(
            NEO4J_URI,
            auth=(NEO4J_USER, NEO4J_PASSWORD)
        )
        
    def get_entities(self) -> List[str]:
        """获取所有实体名称"""
        query = """
        MATCH (n) 
        WHERE n:Course OR n:Person OR n:Building OR n:Department
        RETURN n.name as name
        """
        with self.driver.session() as session:
            result = session.run(query)
            return [record["name"] for record in result]

class EntityRecognizer:
    def __init__(self, entity_list: List[str]):
        self.exact_entities = entity_list
        self.fuzzy_index = self._build_fuzzy_index(entity_list)
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        
    def _build_fuzzy_index(self, entities: List[str]) -> Dict[str, List[str]]:
        """构建语音哈希索引"""
        index = {}
        for entity in entities:
            key = self._soundex(entity)
            if key not in index:
                index[key] = []
            index[key].append(entity)
        return index
    
    def _soundex(self, word: str) -> str:
        """中文语音编码简化版"""
        first_char = word[0]
        code = first_char
        return code
    
    def exact_match(self, text: str) -> List[str]:
        """精确匹配实体"""
        matched = []
        for entity in self.exact_entities:
            if entity in text:
                matched.append(entity)
        return matched
    
    def fuzzy_match(self, text: str, threshold: int = 2) -> List[str]:
        """模糊匹配实体"""
        candidates = []
        tokens = jieba.lcut(text)
        for token in tokens:
            if len(token) < 2:
                continue
            sound_key = self._soundex(token)
            for entity in self.fuzzy_index.get(sound_key, []):
                if levenshtein_distance(token, entity) <= threshold:
                    candidates.append(entity)
        return list(set(candidates))
    
    def semantic_match(self, text: str, top_k: int = 3) -> List[str]:
        """语义匹配实体"""
        text_embed = self.model.encode(text)
        entity_embeds = self.model.encode(self.exact_entities)
        
        similarities = np.dot(text_embed, entity_embeds.T)
        top_indices = np.argsort(similarities)[-top_k:][::-1]
        
        return [self.exact_entities[i] for i in top_indices]

class ProcessDetector:
    def __init__(self):
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        self.process_embeds = self.model.encode(VECTOR_DB_PROCESSES)
        
    def detect(self, query: str, threshold: float = 0.7) -> str:
        """检测流程类问题"""
        query_embed = self.model.encode(query)
        similarities = np.dot(query_embed, self.process_embeds.T)
        max_idx = np.argmax(similarities)
        
        if similarities[max_idx] > threshold:
            return VECTOR_DB_PROCESSES[max_idx]
        return None

# -------------------- 路由系统 --------------------
class CampusAssistant:
    def __init__(self):
        # 初始化组件
        self.neo4j = Neo4jConnector()
        self.entity_recognizer = EntityRecognizer(self.neo4j.get_entities())
        self.process_detector = ProcessDetector()
        
        # 缓存常用查询
        self.cache = {}
        
    def _get_neo4j_answer(self, entities: List[str]) -> str:
        """执行Neo4j查询"""
        # 示例查询逻辑，可根据实际需求扩展
        query_template = """
        MATCH (n)-[r]->(m)
        WHERE n.name IN $entities
        RETURN n.name, type(r) as rel_type, m.name
        LIMIT 5
        """
        with self.neo4j.driver.session() as session:
            result = session.run(query_template, entities=entities)
            return "\n".join([f"{record['n.name']} -> {record['rel_type']} -> {record['m.name']}" 
                            for record in result])
    
    def _get_process_answer(self, process_name: str) -> str:
        """获取流程回答（模拟向量数据库查询）"""
        # 此处应替换为实际的向量数据库查询
        process_answers = {
            "补办学生证流程": "1. 登录教务系统...\n2. 准备身份证复印件...",
            "缓考申请流程": "1. 考前3天提交申请...\n2. 院系审核...",
            # ... 其他流程的预定义回答
        }
        return process_answers.get(process_name, "该流程的详细信息暂未收录")
    
    def route_query(self, question: str) -> Tuple[str, str]:
        """路由查询并返回回答"""
        # 第一步：检测流程类问题
        process = self.process_detector.detect(question)
        if process:
            return ("process", self._get_process_answer(process))
        
        # 第二步：实体识别
        entities = []
        entities += self.entity_recognizer.exact_match(question)
        if not entities:
            entities += self.entity_recognizer.fuzzy_match(question)
        if not entities:
            entities += self.entity_recognizer.semantic_match(question)
            
        if entities:
            return ("neo4j", self._get_neo4j_answer(entities))
        
        # 默认回答
        return ("general", "暂时无法回答这个问题，已记录您的需求")

# -------------------- 使用示例 --------------------
if __name__ == "__main__":
    assistant = CampusAssistant()
    
    test_questions = [
        "如何补办学生证？",
        "张老师的办公室在哪里？",
        "计算机学院的课程安排是怎样的？",
        "今天天气怎么样？"
    ]
    
    for q in test_questions:
        q_type, answer = assistant.route_query(q)
        print(f"问题：{q}")
        print(f"类型：{q_type}")
        print(f"回答：{answer}\n{'-'*40}")