In [None]:
import os
import time
import logging
import pickle
import tqdm
import json
from bertopic import BERTopic
from sklearn.feature_extraction.text import CountVectorizer
from umap import UMAP
from sentence_transformers import SentenceTransformer

In [2]:
# 假设 BASE_DIR 已经定义为项目的根目录
BASE_DIR = '/Users/jessie/Documents/Projects/Cusanus_Topic_Modeling'  # 替换为实际的路径

# 加载停用词表
with open(os.path.join(BASE_DIR, 'data/external/stopwords.pkl'), 'rb') as f:
    latin_stopwords = pickle.load(f)

In [None]:
# 生成唯一实验ID
experiment_id = f"bertopic_experiment_{int(time.time())}"
experiment_dir = os.path.join(BASE_DIR, 'experiments', experiment_id)

# 创建实验目录
if not os.path.exists(experiment_dir):
    os.makedirs(experiment_dir)

# 获取当前 experiment_id 的日志文件路径
log_file_path = os.path.join(experiment_dir, f"bertopic_experiment_{experiment_id}.log")

# 配置日志记录，使每个实验的日志记录到独立的文件中
logger = logging.getLogger()
if logger.hasHandlers():
    logger.handlers.clear()  # 清除现有的处理器，避免重复添加

logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")

# 添加文件日志处理器
file_handler = logging.FileHandler(log_file_path)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

# 添加控制台日志处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

# 确保在配置 logging 后马上打印一条信息，便于确认新的日志文件被创建
logger.info(f"启动实验 {experiment_id}，日志记录到 {log_file_path}")
print(f"启动实验 {experiment_id}，日志记录到 {log_file_path}")

In [None]:
# 实验配置
# 每次修改这里

experiment_config = {
    "parameters": {
        "n_gram_range": (1, 3),  
        "min_topic_size": 3,  # 将 min_topic_size 设置得更小，以便生成更多的细分主题
        "nr_topics": "auto",  
        "embedding_model_name": "xlm-r-distilroberta-base-paraphrase-v1",  # 嵌入模型名称
        "umap_params": {
            "n_neighbors": 10,
            "min_dist": 0.05,
            "n_components": 5,
            "random_state": 42
        }
    }
}

config_path = os.path.join(experiment_dir, 'config.json')
with open(config_path, 'w') as config_file:
    json.dump(experiment_config, config_file, indent=4)

logger.info(f"实验配置已保存到 {config_path}")
print(f"实验配置已保存到 {config_path}")

In [None]:
# 加载预训练的 SentenceTransformer 模型
model_path = os.path.join(BASE_DIR, 'saved_models', experiment_config["parameters"]["embedding_model_name"])

if os.path.exists(model_path):
    # 如果本地已经保存了模型，则直接加载
    print(f"从本地路径 {model_path} 加载嵌入模型...")
    logger.info(f"从本地路径 {model_path} 加载嵌入模型...")
    embedding_model = SentenceTransformer(model_path)
else:
    # 如果本地没有模型，则从远程加载并保存
    print(f"加载预训练的嵌入模型 {experiment_config['parameters']['embedding_model_name']}...")
    logger.info(f"加载预训练的嵌入模型 {experiment_config['parameters']['embedding_model_name']}...")
    embedding_model = SentenceTransformer(experiment_config['parameters']['embedding_model_name'])
    embedding_model.save(model_path)  # 保存到本地路径
    logger.info(f"嵌入模型加载成功并已保存到本地 {model_path}。")
    print(f"嵌入模型加载成功并已保存到本地 {model_path}。")

In [None]:
# 加载测试集数据
testset_dir = os.path.join(BASE_DIR, 'data/testset')
documents = []

# 加载测试集文件
print("加载测试集数据...")
logging.info("加载测试集数据...")

try:
    test_files = [f for f in os.listdir(testset_dir) if f.endswith('.txt')]
    for test_file in tqdm.tqdm(test_files, desc="Loading testset files"):
        file_path = os.path.join(testset_dir, test_file)
        with open(file_path, 'r') as file:
            documents.append(file.read())

    logging.info(f"加载了 {len(documents)} 个文档用于 BERTopic 实验。")
    print(f"加载了 {len(documents)} 个文档用于 BERTopic 实验。")
except FileNotFoundError:
    print(f"测试集目录未找到: {testset_dir}")
    logging.error(f"测试集目录未找到: {testset_dir}")


In [38]:
def initialize_vectorizer_and_model(experiment_config, stopwords, embedding_model):
    vectorizer_model = CountVectorizer(
        ngram_range=experiment_config["parameters"]["n_gram_range"],
        token_pattern=r"(?u)\b\w+\b",  # 适用于拉丁语的正则表达式
        stop_words=stopwords  # 使用自定义的停用词表
    )
    
    # 自定义的 UMAP 设置以进行降维
    umap_params = experiment_config["parameters"]["umap_params"]
    custom_umap = UMAP(
        n_neighbors=umap_params["n_neighbors"],
        min_dist=umap_params["min_dist"],
        n_components=umap_params["n_components"],
        random_state=umap_params["random_state"]
    )
    
    topic_model = BERTopic(
        language="multilingual",  # 拉丁语适合使用多语言模型
        min_topic_size=experiment_config["parameters"]["min_topic_size"],
        vectorizer_model=vectorizer_model,
        embedding_model=embedding_model,  # 使用提前加载的嵌入模型
        umap_model=custom_umap  # 使用自定义的 UMAP 设置
    )
    return topic_model



In [None]:
# 初始化模型
topic_model = initialize_vectorizer_and_model(experiment_config, latin_stopwords, embedding_model)

logging.info("BERTopic 模型初始化成功。")
print("BERTopic 模型初始化成功。")

In [None]:
# 训练模型
if documents:
    print("开始训练 BERTopic 模型...")
    logger.info("开始训练 BERTopic 模型...")

    # 训练模型并显示训练进度
    try:
        topics, probabilities = topic_model.fit_transform(tqdm.tqdm(documents, desc="Training BERTopic model"))
        logger.info("BERTopic 模型训练成功。")
        print("BERTopic 模型训练成功。")
        
        # 尽量避免主题合并，保持较多的主题数量
        if experiment_config["parameters"].get("nr_topics") != "auto":
            target_topics = experiment_config["parameters"]["nr_topics"]
            if len(topic_model.get_topic_info()) > target_topics:
                topic_model = topic_model.reduce_topics(documents, nr_topics=target_topics)
                logger.info(f"BERTopic 模型主题数量已减少到 {target_topics}。")
                print(f"BERTopic 模型主题数量已减少到 {target_topics}。")
        
    except Exception as e:
        logger.error(f"训练 BERTopic 模型时发生错误: {e}")
        print(f"训练 BERTopic 模型时发生错误: {e}")

    # 保存模型
    model_path = os.path.join(experiment_dir, 'model')
    topic_model.save(model_path)
    logger.info(f"BERTopic 模型已保存到 {model_path}")
    print(f"BERTopic 模型已保存到 {model_path}")

In [None]:
print("生成的主题：")
logger.info("生成的主题：")
topics_overview = topic_model.get_topic_info()

for index, row in topics_overview.iterrows():
    topic_num = row['Topic']
    if topic_num != -1:  # 排除噪声主题
        topic_keywords = topic_model.get_topic(topic_num)[:10]  # 获取前十个关键词
        keywords_str = ', '.join([word for word, _ in topic_keywords])
        print(f"主题 {topic_num}: {row['Name']} - 关键词: {keywords_str}")
        logger.info(f"主题 {topic_num}: {row['Name']} - 关键词: {keywords_str}")
