In [2]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import silhouette_score, fowlkes_mallows_score,adjusted_rand_score,adjusted_mutual_info_score

预处理方式与kmeans一致

In [3]:
headers = [
    'sepal length',
    'sepal width',
    'petal length',
    'petal width',
    'class'
]
data = pd.read_csv('data/iris.data',header=None)
data.columns = headers
class_map = {'Iris-setosa':0,'Iris-versicolor':1,'Iris-virginica':2}
data['class'] = data['class'].map(class_map)
X = data[['sepal length','petal length']].to_numpy().astype(np.float64)
X_mean = np.mean(X,axis=0)
X_std = np.std(X,axis=0)
X = (X-X_mean)/X_std
y = data['class'].to_numpy().astype(np.float64)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1234)

In [None]:
# 计算多元高斯分布的概率
def multivariate_normal_pdf(x, mean, cov):
    n = len(mean)
    
    # 计算协方差矩阵的行列式和逆矩阵
    det_cov = np.linalg.det(cov)
    inv_cov = np.linalg.inv(cov)
    
    # 计算常数部分 (2π)^(n/2) * |Σ|^(1/2)
    de = (2 * np.pi) ** (n / 2) * np.sqrt(det_cov)
    
    # 计算每个样本与均值的差
    diff = x - mean  # diff 的形状是 (n_samples, n_features)
    
    # 对每个样本计算 Mahalanobis 距离：diff * inv_cov * diff.T
    mahalanobis_dist = np.sum(np.dot(diff, inv_cov) * diff, axis=1)  # 结果是 (n_samples,)
    
    # 计算每个样本的概率密度
    me = np.exp(-0.5 * mahalanobis_dist)
    
    # 返回PDF值
    return me / de

def gmm(X, k, max_iters=100, tol=1e-4):
    n_samples, n_features = X.shape

    # 初始化参数
    np.random.seed(42)
    means = X[np.random.choice(n_samples, k, replace=False)]  # 初始均值
    covariances = np.array([np.eye(n_features)] * k)  # 初始协方差矩阵为单位矩阵
    weights = np.ones(k) / k  # 初始化权重均匀分配

    log_likelihoods = []

    for i in range(max_iters):
        # E步：计算责任度（每个样本属于各个高斯分布的概率）
        responsibilities = np.zeros((n_samples, k))

        for j in range(k):
            responsibilities[:, j] = weights[j] * multivariate_normal_pdf(X, mean=means[j], cov=covariances[j])

        responsibilities /= responsibilities.sum(axis=1, keepdims=True)

        # M步：更新参数（均值、协方差矩阵和权重）
        N_k = responsibilities.sum(axis=0)  # 每个高斯分布的总责任度（也就是样本的数量）

        means = (responsibilities.T @ X) / N_k[:, np.newaxis]  # 更新均值
        for j in range(k):
            diff = X - means[j]
            covariances[j] = (responsibilities[:, j][:, np.newaxis] * diff).T @ diff / N_k[j]  # 更新协方差矩阵

        weights = N_k / n_samples  # 更新权重

        # 计算对数似然
        log_likelihood = np.sum(np.log(np.sum(responsibilities, axis=1)))
        log_likelihoods.append(log_likelihood)

        # 检查对数似然是否收敛
        if i > 0 and abs(log_likelihood - log_likelihoods[i-1]) < tol:
            print(f"Converged after {i+1} iterations.")
            break

    # 根据责任度分配每个样本的标签
    labels = np.argmax(responsibilities, axis=1)

    return means, covariances, weights, labels

In [41]:
means,covs,w,l = gmm(X_train,2)

Converged after 2 iterations.


In [42]:
def predict(X, means, covariances, weights):
    n_samples = X.shape[0]
    k = len(weights)
    responsibilities = np.zeros((n_samples, k))
    
    for i in range(k):
        norm = multivariate_normal_pdf(X,means[i], covariances[i])
        responsibilities[:, i] = weights[i] * norm
    
    responsibilities /= responsibilities.sum(axis=1, keepdims=True)
    labels = np.argmax(responsibilities, axis=1)
    
    return labels

In [39]:
y_pred = predict(X_test,means,covs,w)

In [44]:
print('GMM:')
silhouette_avg = silhouette_score(X_test, y_pred)
print(f'Silhouette Coefficient: {silhouette_avg}')

fmi = fowlkes_mallows_score(y_test, y_pred)
print(f'Fowlkes-Mallows Score: {fmi}')

ari = adjusted_rand_score(y_test, y_pred)
print(f'Adjusted Rand Index: {ari}')

ami = adjusted_mutual_info_score(y_test, y_pred)
print(f'Adjusted Mutual Information: {ami}')

GMM:
Silhouette Coefficient: 0.3946867184083424
Fowlkes-Mallows Score: 0.573273174155959
Adjusted Rand Index: 0.1603924824298985
Adjusted Mutual Information: 0.2353402560159457
