访问本地ollama模型

In [None]:
from langchain.llms import Ollama

# 选择 LLM，如 "mistral" 或 "llama2"
llm = Ollama(model="deepseek-r1:14b", base_url="http://localhost:11434")

# 生成文本
response = llm("你是谁")
print(response)


访问本地ollama模型+资料库

In [2]:
#公共结构定义

from langchain.document_loaders import (
    TextLoader,
    PyPDFLoader,
    CSVLoader,
    JSONLoader,
    UnstructuredMarkdownLoader
)
from langchain.embeddings import OllamaEmbeddings
import numpy as np

# 根据文档类型选择参数
splitter_configs = {
        "general": {
            "chunk_size": 1000,
            "chunk_overlap": 200
        },
        "technical": {
            "chunk_size": 1500,
            "chunk_overlap": 300,
            "separator": "\n\n"
        },
        "code": {
            "chunk_size": 800,
            "chunk_overlap": 100,
            "separator": "\n\n"
        },
        "markdown": {
            "chunk_size": 1000,
            "chunk_overlap": 150,
            "separator": "\n## "    # Markdown 二级标题作为分隔符
        },
        "news": {
            "chunk_size": 500,      # 减小块大小
            "chunk_overlap": 50,     # 适当的重叠
            "separator": "\n"        # 使用换行符作为分隔符
        }
    }

# 文件类型到加载器的映射
LOADER_MAPPING = {
    '.txt': TextLoader,
    '.md': UnstructuredMarkdownLoader,
    '.pdf': PyPDFLoader,
    '.csv': CSVLoader,
    '.json': JSONLoader
}

class NormalizedOllamaEmbeddings(OllamaEmbeddings):
    def embed_documents(self, texts):
        embeddings = super().embed_documents(texts)
        # L2 归一化
        normalized = [embedding/np.linalg.norm(embedding) for embedding in embeddings]
        return normalized
    
    def embed_query(self, text):
        embedding = super().embed_query(text)
        # L2 归一化
        return embedding/np.linalg.norm(embedding)


In [None]:
import os
from langchain.vectorstores import FAISS
from langchain.embeddings import OllamaEmbeddings
from langchain.document_loaders import (
    TextLoader,
    PyPDFLoader,
    CSVLoader,
    JSONLoader,
    UnstructuredMarkdownLoader
)
from langchain.text_splitter import CharacterTextSplitter
from typing import List, Dict

def load_documents_from_directory(
    directory_path: str,
    db_path: str,
    doc_type: str = "news",
    supported_extensions: List[str] = ['.txt', '.md', '.py', '.pdf']
) -> None:
    # 设置嵌入模型
    embedding = NormalizedOllamaEmbeddings(
        model="bge-m3:latest",
        base_url="http://localhost:11434"
    )
    
    # 获取分割器配置
    splitter_config = splitter_configs.get(doc_type, splitter_configs["general"])
    text_splitter = CharacterTextSplitter(**splitter_config)
    
    # 存储所有文档
    all_documents = []
    file_count = 0
    
    try:
        # 遍历目录
        for root, _, files in os.walk(directory_path):
            for file in files:
                if any(file.endswith(ext) for ext in supported_extensions):
                    file_path = os.path.join(root, file)
                    try:
                        # 添加调试信息
                        print(f"\n开始处理文件: {file_path}")
                        print(f"文件大小: {os.path.getsize(file_path)} bytes")
                        
                        # 尝试直接读取文件内容
                        # with open(file_path, 'r', encoding='utf-8') as f:
                        #     raw_content = f.read()
                        #     print(f"原始文件内容长度: {len(raw_content)} 字符")
                        
                        # 加载文档
                        file_extension = os.path.splitext(file_path)[1].lower()
                        if file_extension in LOADER_MAPPING:
                            loader_class = LOADER_MAPPING[file_extension]
                            loader = loader_class(file_path)  # 显式指定编码
                            documents = loader.load()
                            print(f"加载后的文档数量: {len(documents)}")
                            
                            if len(documents) == 0:
                                print(f"警告: 文档加载后为空")
                                continue
                                
                            # 分割文档
                            texts = text_splitter.split_documents(documents)
                            print(f"分割后的文本块数量: {len(texts)}")
                            
                            if len(texts) == 0:
                                print(f"警告: 文本分割后为空")
                                continue
                                
                            all_documents.extend(texts)
                            file_count += 1
                            print(f"成功加载文件: {file_path}")
                            
                    except Exception as e:
                        print(f"处理文件 {file_path} 时出错: {str(e)}")
                        print(f"错误类型: {type(e).__name__}")
                        import traceback
                        print(f"详细错误信息: {traceback.format_exc()}")
                        
        print(f"\n总共处理了 {file_count} 个文件")
        print(f"总共获得 {len(all_documents)} 个文本块")
        
        # # 创建或更新向量数据库
        # try:
        #     # 尝试加载现有数据库
        #     vector_db = FAISS.load_local(
        #         db_path,
        #         embeddings=embedding,
        #         allow_dangerous_deserialization=True
        #     )
        #     print("加载现有数据库成功，添加新文档...")
        #     vector_db.add_documents(all_documents)
        # except:
        print("创建新的向量数据库...")
        print(all_documents)
        vector_db = FAISS.from_documents(all_documents, embedding)
        
        # 保存数据库
        vector_db.save_local(db_path)
        print("成功保存向量数据库")
        
        # 测试搜索
        results = vector_db.similarity_search("黄金", k=7)
        print("\n测试搜索结果:")
        for i, doc in enumerate(results, 1):
            print(f"\n文档 {i}:")
            print(f"来源: {doc.metadata['source']}")
            print(f"内容: {doc.page_content[:100]}...")
        
    except Exception as e:
        print(f"处理过程中出错: {e}")


# 配置参数
directory_path = "test_file"  # 文档目录
doc_type = "technical"        # 文档类型
supported_extensions = [      # 支持的文件类型
    '.txt',
    '.md',
    '.pdf',
    '.json',
    '.csv'
]
    
# 执行导入
load_documents_from_directory(
    directory_path=directory_path,
    db_path="faiss_index",
    doc_type=doc_type,
    supported_extensions=supported_extensions
)

In [None]:
#测试txt文件向量化

from langchain.vectorstores import FAISS
from langchain.embeddings import OllamaEmbeddings
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter

# 设置嵌入模型
embedding = OllamaEmbeddings(
    model="bge-m3:latest",
    base_url="http://localhost:11434"
)

# 加载文档
loader = TextLoader('test_file/金价向上、金店向下：金饰品牌加速关店.txt')  # 替换为你的文档路径
documents = loader.load()

# 分割文档
# 根据文档类型选择参数
splitter_config = splitter_configs.get("news", splitter_configs["general"])
print(splitter_config)

# 创建分隔器
text_splitter = CharacterTextSplitter(**splitter_config)

texts = text_splitter.split_documents(documents)
print(texts)

# 创建向量数据库
vector_db = FAISS.from_documents(texts, embedding)

# 保存到本地（注意路径）
vector_db.save_local("faiss_index")

In [None]:
#测试pdf文件向量化

from langchain.vectorstores import FAISS
from langchain.embeddings import OllamaEmbeddings
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter

# 设置嵌入模型
embedding = OllamaEmbeddings(
    model="bge-m3:latest",
    base_url="http://localhost:11434"
)

# 加载文档
loader = PyPDFLoader('test_file/test_pdf.pdf')  # 替换为你的文档路径
documents = loader.load()

# 分割文档
# 根据文档类型选择参数
splitter_config = splitter_configs.get("technical", splitter_configs["general"])
print(splitter_config)

# 创建分隔器
text_splitter = CharacterTextSplitter(**splitter_config)

texts = text_splitter.split_documents(documents)
print(texts)

# 创建向量数据库
vector_db = FAISS.from_documents(texts, embedding)

# 保存到本地（注意路径）
vector_db.save_local("faiss_index")

In [None]:
#测试faiss搜索

from langchain.vectorstores import FAISS
from langchain.embeddings import OllamaEmbeddings

# 使用 Ollama 的嵌入模型
embedding = NormalizedOllamaEmbeddings(
    model="bge-m3:latest",  # 或其他你想使用的模型
    base_url="http://localhost:11434"  # 使用你的 Ollama 服务端口
)

# 创建或加载向量数据库
vector_db = FAISS.load_local(
    "faiss_index", 
    embeddings=embedding,
    allow_dangerous_deserialization=True  # 添加这个参数
)

# 语义搜索
query = "rank表是什么"
# 增加 k 和 score_threshold
docs_and_scores = vector_db.similarity_search_with_score(
    query,
    k=7,                    # 返回更多结果
    score_threshold=0.9     # 设置相似度阈值
)

print("\n=== 搜索结果 ===")
for i, (doc, score) in enumerate(docs_and_scores, 1):
    print(f"\n文档 {i} (相似度分数: {score}):")
    print(f"来源: {doc.metadata['source']}")
    print(f"内容: {doc.page_content[:200]}...")
    print("-" * 50)


In [None]:
#直接使用 LLM + 模板

from langchain.callbacks.base import BaseCallbackHandler
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate

# 处理搜索结果
def get_answer(query, search_results):
    # 合并上下文
    context = "\n\n".join([doc.page_content for doc in search_results])
    
    # 生成完整提示
    final_prompt = prompt.format(
        context=context,
        question=query
    )
    
    # 获取回答
    for chunk in llm.stream(final_prompt):
        print(chunk, end="", flush=True)

# 创建 LLM
llm = Ollama(
    model="deepseek-r1:14b",
    base_url="http://localhost:11434",
)

# 创建提示模板
template = """基于以下信息回答问题：

背景信息：
{context}

问题：{question}

请给出详细的回答："""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

query = "rank表是什么"

get_answer(query, vector_db.similarity_search(query, k=7, score_threshold=0.9))