In [None]:
import heapq

import networkx as nx
import numpy as np
import pandas as pd
import requests
from matplotlib import rcParams, pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, mutual_info_score
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import naiveBayes as nb

In [None]:
def compute_mutual_information(X):
    n_features = X.shape[1]  # 特征数量
    print(n_features)
    mi_matrix = np.zeros((n_features, n_features))

    # 计算每对特征之间的互信息
    with tqdm(total=n_features * (n_features - 1) // 2, desc="计算互信息") as pbar:
        for i in range(n_features):
            for j in range(i + 1, n_features):
                mi_matrix[i, j] = mutual_info_score(X[:, i], X[:, j])
                pbar.update(1)

    np.fill_diagonal(mi_matrix, [mutual_info_score(X[:, i], X[:, i]) for i in range(n_features)])
    mi_matrix = mi_matrix + mi_matrix.T

    return mi_matrix

In [None]:
def prim_algorithm(mi_matrix):
    n_features = mi_matrix.shape[0]
    selected_nodes = {0}  # 使用集合来存储已选择的节点
    edges = []

    # 初始化候选边集合，过滤掉权重为0的边
    candidate_edges = [(0, j, mi_matrix[0, j]) for j in range(1, n_features) if mi_matrix[0, j] > 0]

    # 进度条
    with tqdm(total=n_features - 1, desc="构建树") as pbar:
        while len(selected_nodes) < n_features:
            # 找到权重最大的边
            new_edge = max(candidate_edges, key=lambda x: x[2])
            candidate_edges.remove(new_edge)
            edges.append((new_edge[0], new_edge[1]))
            selected_nodes.add(new_edge[1])

            # 更新候选边集合
            for j in range(n_features):
                if j not in selected_nodes and mi_matrix[new_edge[1], j] > 0:
                    candidate_edges.append((new_edge[1], j, mi_matrix[new_edge[1], j]))

            pbar.update(1)
    
    return edges

In [None]:
class TAN:
    def __init__(self, vocabList):
        self.class_prior = {}  # 存储类的先验概率
        self.feature_probs = {}  # 存储特征的条件概率
        self.edges = []  # 存储树的边
        self.vocabList = vocabList

    def fit(self, X, y):
        n_samples, n_features = X.shape  # 获取样本数和特征数
        self.classes, counts = np.unique(y, return_counts=True)  # 获取类标签及其计数
        self.class_prior = dict(zip(self.classes, counts / n_samples))  # 计算先验概率

        mi_matrix = compute_mutual_information(X)  # 计算互信息矩阵
        self.edges = prim_algorithm(mi_matrix)  # 构建最大权重生成树

        self.feature_probs = {c: [{} for _ in range(n_features)] for c in self.classes}  # 初始化条件概率
        for c in tqdm(self.classes, desc="计算条件概率"):
            X_c = X[y == c]  # 获取属于类c的样本
            X_c_df = pd.DataFrame(X_c)
            for i in range(n_features):
                parent = next((edge[0] for edge in self.edges if edge[1] == i), None)  # 找到特征i的父节点
                if parent is None:
                    probs = X_c_df[i].value_counts(normalize=True).to_dict()  # 计算P(X_i|C)
                else:
                    probs = X_c_df.groupby(parent)[i].value_counts(normalize=True).to_dict()  # 计算P(X_i|X_parent, C)
                self.feature_probs[c][i] = probs

    def predict(self, X):
        n_samples, n_features = X.shape
        log_prob = np.zeros((n_samples, len(self.classes)))

        for i, c in enumerate(self.classes):
            log_prob[:, i] += np.log(self.class_prior[c])  # 加上类的先验概率
            for j in range(n_features):
                parent = next((edge[0] for edge in self.edges if edge[1] == j), None)  # 找到特征j的父节点
                if parent is None:
                    probs = np.array([self.feature_probs[c][j].get(x, 1e-6) for x in X[:, j]])  # 计算P(X_j|C)
                else:
                    probs = np.array([self.feature_probs[c][j].get((X[k, parent], X[k, j]), 1e-6) for k in
                                      range(n_samples)])  # 计算P(X_j|X_parent, C)
                log_prob[:, i] += np.log(probs)

        return self.classes[np.argmax(log_prob, axis=1)]  # 返回概率最大的类

In [None]:
# 加载数据集
docs, label = nb.loadDataSet()

# 确定子集
sample_size = min(500, len(label))  # 设置样本子集的大小
subset_indices = np.random.choice(len(label), sample_size, replace=False)
docs = [docs[i] for i in subset_indices]
label = [label[i] for i in subset_indices]

In [None]:
# 创建词汇表
vocabList = nb.createVocabList(docs)

# 构建词向量矩阵
trainMat = []
for inputSet in tqdm(docs, desc='构建词向量矩阵'):
    trainMat.append(nb.setOfWords2Vec(vocabList, inputSet))

In [None]:
X_train, X_test, y_train, y_test = train_test_split(trainMat, label, test_size=0.2, random_state=1)


In [None]:
# 训练模型
model = TAN(vocabList)
X_train = np.array(X_train)
X_test = np.array(X_test)
model.fit(X_train, y_train)

In [None]:
# 预测
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
conf_matrix = confusion_matrix(y_test, y_pred)