# 导入相关软件包

In [1]:
import os
import sys
import random

import numpy as np
from collections import defaultdict

import jieba

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import KFold, StratifiedKFold

# 参数配置

In [2]:
# 随机种子
def seed_pytorch(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

In [3]:
config_para = {
    'data_path': "C:\\Users\\Jiazhen Huang\\Downloads\\THUCNews",
    'seed':42,
}

In [4]:
seed_pytorch(seed=config_para['seed'])

# 数据读入

In [5]:
# 获取类别列表（这里只有教育）
data_dir = config_para['data_path']

class_list = [category for category in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, category))]
print("类别列表:", class_list)

类别列表: ['体育', '娱乐', '家居', '彩票', '房产', '教育', '时尚', '时政', '星座', '游戏', '社会', '科技', '股票', '财经']


In [6]:
# 加载文本数据和对应的label
# 数据太多了，在这里选了每个类别2000个case

features, labels = [], []
max_text_cnt = 2000
for category in class_list:
    category_path = os.path.join(data_dir, category)
    text_files = [text_file for text_file in os.listdir(category_path) if os.path.isfile(os.path.join(category_path, text_file))]
    
    # 如果当前类别的样本数超过 max_text_cnt，则随机选择 max_text_cnt 个样本
    if max_text_cnt is not None and len(text_files) > max_text_cnt:
        text_files = random.sample(text_files, max_text_cnt)
    
    # 加载选中的文本数据和标签
    for text_file in text_files:
        text_path = os.path.join(category_path, text_file)
        with open(text_path, "r", encoding='utf-8') as file:
            content = file.read().strip()
            features.append(content)
            labels.append(category)


In [8]:
len(features)

28000

In [9]:
len(labels)

28000

# 数据加载和处理

In [10]:
# 分词处理
def process_text(X):
    return ["".join(jieba.cut(text)) for text in X]

In [14]:
# 停用词处理
def make_words_set(words_file):
    stopwords_list = []
    with open(words_file, 'r', encoding='utf-8') as fp:
        for line in fp:
            word = line.strip()
            if len(word) > 0 and word not in stopwords_list:
                stopwords_list.append(word)
    return stopwords_list

In [16]:
# 进行特征提取

# 加载停用词
stopwords_set = make_words_set('./stopwords_cn.txt')

# 特征提取，并转换为array
vectorizer = TfidfVectorizer(stop_words=stopwords_set, max_features=3000)
features = vectorizer.fit_transform(process_text(features)).toarray()
labels = np.array(labels)

In [17]:
features.shape

(28000, 3000)

In [18]:
labels.shape

(28000,)

# 算法实现

In [22]:
class MultinomialNaiveBayesCustom:
    def __init__(self):
        self.classes = None
        self.voc_size = None # 词表大小
        self.prior = {} # 先验概率
        self.cond = defaultdict(dict) # 条件概率
    
    def fit(self, X, y):
        self.classes = np.unique(y)
        n_classes = len(self.classes)
        self.voc_size = X.shape[1]
        
        # 计算先验概率 P(y)
        for c in self.classes:
            self.prior[c] = np.sum(y == c) / len(y)
        
        # 计算条件概率 P(x|y)
        for c in self.classes:
            X_c = X[y == c]
            class_word_count = X_c.sum(axis=0)
            total_word_count = class_word_count.sum()
            
            # 这里使用了拉普拉斯平滑：P(x|y) = (词x在类别y中的出现次数 + 1) / (类别y的总词数 + 词表大小)
            self.cond[c] = (class_word_count + 1) / (total_word_count + self.voc_size)
    
    def predict(self, X):
        predictions = []
        for x in X:
            x = x.flatten()
            post = {}
            for c in self.classes:
                log_prob = np.log(self.prior[c])
                for i in range(self.voc_size):
                    if x[i] > 0:
                        log_prob += x[i] * np.log(self.cond[c][i])
                post[c] = log_prob
            predictions.append(max(post, key=lambda k: post[k]))
        return predictions

# 实验

In [20]:
# 设置了一个统一的评估接口，保证简便性
def evaluate(model, X, y, n_splits=10):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=config_para['seed'])
    metrics = {
        "accuracy": [],
        "precision_macro": [],
        "recall_macro": [],
        "f1_macro": [],
        "precision_micro": [],
        "recall_micro": [],
        "f1_micro": []
    }
    
    for train_idx, test_idx in skf.split(X, y):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        
        # 这里同时计算了宏平均和微平均，因为acc本来就是全局指标，所以没有macro/micro之分
        metrics["accuracy"].append(accuracy_score(y_test, y_pred))
        metrics["precision_macro"].append(precision_score(y_test, y_pred, average="macro"))
        metrics["recall_macro"].append(recall_score(y_test, y_pred, average="macro"))
        metrics["f1_macro"].append(f1_score(y_test, y_pred, average="macro"))
        metrics["precision_micro"].append(precision_score(y_test, y_pred, average="micro"))
        metrics["recall_micro"].append(recall_score(y_test, y_pred, average="micro"))
        metrics["f1_micro"].append(f1_score(y_test, y_pred, average="micro"))
    
    # 计算平均值
    for key in metrics:
        metrics[key] = np.mean(metrics[key])
    
    return metrics

In [23]:
# 初始化实验设置

# 训练与评估
model = MultinomialNaiveBayesCustom()

# 评估模型
metrics = evaluate(model, features, labels, n_splits=10)

In [24]:
# 输出结果
print("实验结果：")
print(f"Accuracy: {metrics['accuracy']:.6f}")
print(f"Precision (Macro): {metrics['precision_macro']:.6f}")
print(f"Recall (Macro): {metrics['recall_macro']:.6f}")
print(f"F1-Value (Macro): {metrics['f1_macro']:.6f}")
print(f"Precision (Micro): {metrics['precision_micro']:.6f}")
print(f"Recall (Micro): {metrics['recall_micro']:.6f}")
print(f"F1-Value (Micro): {metrics['f1_micro']:.6f}")

实验结果：
Accuracy: 0.694286
Precision (Macro): 0.725842
Recall (Macro): 0.694286
F1-Value (Macro): 0.697446
Precision (Micro): 0.694286
Recall (Micro): 0.694286
F1-Value (Micro): 0.694286
