In [None]:
cd /home/chp3/03-GMM-EM/

In [None]:
import numpy as np
from utils import *
import scipy.cluster.vq as vq
from scipy.stats import multivariate_normal

In [None]:
# GMM是一种软对齐方式
# 使用后验概率来表示某个数据点由某个类别产生的概率

class GMM(object):
    
    def __init__(self, D, K=5):
        assert(D > 0)
        self.dim = D
        self.K = K
        # KMeans Initial
        self.mu, self.cov, self.alpha = self.kmeans_initial()
        
    def kmeans_initial(self):
        # 初始化mu_k, sigma_k, pi_k
        mu = []
        sigma = []
        data = read_all_data('/home/chp3/03-GMM-EM/train/feats.scp')
        (centroids, labels) = vq.kmeans2(data, self.K, minit="points", iter=100)
        clusters = [[] for i in range(self.K)]
        for (l, d) in zip(labels, data):
            clusters[l].append(d)
        
        for cluster in clusters:
            mu.append(np.mean(cluster, axis=0))
            sigma.append(np.cov(cluster, rowvar=0))
        pi = np.array([len(c) * 1.0 / len(data) for c in clusters])
        return mu, sigma, pi
    
    def calc_log_likelihood(self, X):
        """
        Calculate log likelihood of GMM
        :param X: A matrix including data samples, num_samples * D
        :return: log likelihood of current model
        """
        log_llh = 0.0
        n_points, n_clusters = X.shape[0], len(self.alpha)
        pdfs = np.zeros((n_points, n_clusters))
        for i in range(n_clusters):
            pdfs[:, i] = self.alpha[i] * multivariate_normal.pdf(X, self.mu[i], self.cov[i])
        return np.sum(np.log(pdfs.sum(axis=1)))
    
    def E_step(self, X, mu, cov, alpha):
        
        N, D = X.shape
        gama_mat = np.zeros((N, self.K))
        
        # 计算各高斯模型中所有样本出现的概率，行对应样本，列对应模型
        prob = np.zeros((N, self.K))
        for k in range(self.K):
            prob[:, k] = multivariate_normal.pdf(X, mu[k], cov[k])
        
        # 计算每个模型对每个样本的响应度
        for k in range(self.K):
            gama_mat[:, k] = alpha[k] * prob[:, k]
            
        for i in range(N):
            gama_mat[i, :] /= np.sum(gama_mat[i, :])
        return gama_mat
    
    def M_step(self, X, gama_mat):
        
        N, D = X.shape
        
        mu = np.zeros((self.K, D))
        alpha = np.zeros(self.K)
        cov = np.zeros((self.K, D, D))
        
        for k in range(self.K):
            
            Nk = np.sum(gama_mat[:, k])
            # 更新mu
            # 对每个特征求均值
            for d in range(D):
                mu[k][d] = np.sum(np.multiply(gama_mat[:, k], X[:, d])) / Nk
            # 更新 sigma
            for i in range(N):
                left = np.reshape((X[i] - mu[k]), (D,1))
                right = np.reshape((X[i] - mu[k]), (1,D))
                cov[k] += gama_mat[i,k] * left * right
            cov[k] /= Nk
            alpha[k] = Nk / N
                        
        return mu, cov, alpha 
        
        
    def fit(self, X, iterations):
        """
        Update parameters of GMM
        :param X: A matrix including data samples, num_samples * D
        :return: log likelihood of updated model
        """
        
        log_llh = 0.0
        
        mu, cov, alpha = self.kmeans_initial()
        for _ in range(iterations):
            
            _gama_mat = self.E_step(X, mu, cov, alpha)
            mu, cov, alpha = self.M_step(X, _gama_mat)
        
        self.mu = mu
        self.cov = cov
        self.alpha = alpha
        log_llh = self.calc_log_likelihood(X)
        return log_llh 

In [None]:
def train(gmms, num_iterations):
    dict_utt2feat, dict_target2utt = read_feats_and_targets("/home/chp3/03-GMM-EM/train/feats.scp",
                                                           "/home/chp3/03-GMM-EM/train/text")
    for target in targets:
        print(target)
        feats = get_feats(target, dict_utt2feat, dict_target2utt)
#         for i in range(num_iterations):
        log_llh = gmms[target].fit(feats, num_iterations)
    return gmms

def test(gmms):
    correction_num = 0
    error_num = 0
    acc = 0.0
    dict_utt2feat, dict_target2utt = read_feats_and_targets('test/feats.scp',
                                                           'test/text')
    dict_utt2target = {}
    for target in targets:
        utts = dict_target2utt[target]
        for utt in utts:
            dict_utt2target[utt] = target
    
    for utt in dict_utt2feat.keys():
        feats = kaldi_io.read_mat(dict_utt2feat[utt])
        scores = []
        for target in targets:
            scores.append(gmms[target].calc_log_likelihood(feats))
        predict_target = targets[scores.index(max(scores))]
        if predict_target == dict_utt2target[utt]:
            correction_num += 1
        else:
            error_num += 1
    acc = correction_num * 1.0 / (correction_num + error_num)
    return acc

In [None]:
num_gaussian = 5
num_iteration = 5
targets = ['Z', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'O']

gmms = {}
for target in targets:
    gmms[target] = GMM(39, K=num_gaussian)

In [None]:
gmms = train(gmms, 200)

In [None]:
acc = test(gmms)

In [None]:
acc