In [2]:
import matplotlib.pyplot as plt
import numpy as np
import numpy.random as rnd
import scipy.stats as stats
import seaborn as sns
from typing import *


plt.style.use("ggplot")
sns.set_context("poster")

各分布のハイパーパラメーターを保持するクラスを作る。

In [4]:
# Parameters of * distribution
class Gamma:
    def __init__(self, *, a: List[float], b: float):
        self.a = a
        self.b = b
        
    def get():
        """ ガンマ分布をscipyのfrozen RV objectの形で返す。
        Notes:
            scipyのgamma distは、教科書の定義とちょっと異なる。教科書で言うハイパーパラメーターbは、scaleという名前で指定する。
            https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.gamma.html
        """
        return [stats.gamma(a=elem, scale=b) for elem in self.a]


class Poisson:
    def __init__(self, *, lambda_: List[float]):
        self.lambda_ = lambda_

    def get():
        """ ポアソン分布をscipyのfrozen RV objectの形で返す。"""
        return [stats.poisson(mu=elem) for elem in self.lambda_]


class BayesPoiMixModel:
    def __init__(self, *, num_dim: int, num_cluster: int, alpha: List[float], gamDists: List[Gamma]):
        """ ポアソン混合モデルの事前・事後分布を表現するクラスを構築する。
        Args:
            D: 観測データの次元
            K: クラスター数
            alpha : カテゴリ分布のパラメーター $\pi$ の共役事前分布であるディリクレ分布のハイパーパラメーター (p.119)
            gamLists: ポアソン分布のパラメーター $\lambda$ の共役事前分布であるガンマ分布のハイパーパラメーター (p.129)
        """
        self.num_dim = num_dim
        self.num_cluster = num_cluster
        self.alpha = alpha
        self.gamDists = gamDists


class PoiMixModel:
    def __init__(self, *, num_dim: int, num_cluster: int, alpha: List[float], poiDists: List[Poisson]):
        """ 真のポアソン混合モデルを構築する。
        Args:
            D: 観測データの次元
            K: クラスター数
            alpha : 
            poiDists: 
        """
        self.num_dim = num_dim
        self.num_cluster = num_cluster
        self.phi = alpha
        self.poiDists = poiDists

ユーティリティ関数を定義する。

In [None]:
def calc_ELBO(X, prior: BayesPoiMixModel, posterior: BayesPoiMixModel): # X: Mat{float}
    """ ELBOを計算する。付録A.4を参照のこと。
    Args:
        X: 
        prior: 
        posterior: 
    """
    ln_expt_S = update_S(pos, X)
    expt_S = np.exp(ln_expt_S)
    K, N = expt_S.shape
    D = X.shape[0]

    expt_ln_lambda = np.zeros(S.shape) #np.matrix(np.zeros(D, K))
    expt_lambda = np.zeros(S.shape) #np.matrix(np.zeros(D, K))
    expt_ln_lkh = 0
    for k in range(K):
        expt_ln_lambda[:,k] = np.digamma(pos.cmp[k].a) - np.log(pos.cmp[k].b)
        expt_lambda[:,k] = pos.cmp[k].a / pos.cmp[k].b
        for n in range(N)
            expt_ln_lkh += expt_S[k,n] * (
                X[:, n].T * expt_ln_lambda[:,k] 
                    - sum(expt_lambda[:,k]) - sum(np.lgamma(X[:,n]+1))
            )[1]

    expt_ln_pS = sum(expt_S.T * (np.digamma(pos.alpha) - np.digamma(sum(pos.alpha))))
    expt_ln_qS = sum(expt_S .* ln_expt_S)
    KL_lambda = 0
    for k in range(K):
        KL_lambda += (
            sum(pos.cmp[k].a)*np.log(pos.cmp[k].b) - sum(pri.cmp[k].a)*np.log(pri.cmp[k].b)
                - sum(np.lgamma(pos.cmp[k].a)) + sum(np.lgamma(pri.cmp[k].a))
                + (pos.cmp[k].a - pri.cmp[k].a).T * expt_ln_lambda[:,k]
                + (pri.cmp[k].b - pos.cmp[k].b) * sum(expt_lambda[:,k])
        )[1]
    
    KL_pi = (
        np.lgamma(sum(pos.alpha)) - np.lgamma(sum(pri.alpha))
             - sum(np.lgamma(pos.alpha)) + sum(np.lgamma(pri.alpha))
             + (pos.alpha - pri.alpha).T * (np.digamma(pos.alpha) - np.digamma(sum(pos.alpha)))
    )[1]
    return expt_ln_lkh + expt_ln_pS - expt_ln_qS - (KL_lambda + KL_pi)


def add_stats(bpmm: BayesPoiMixModel, X, S):  # X: Matrix{Float64}, S: Matrix{Float64}
    D = bpmm.D
    K = bpmm.K
    sum_S = sum(S, 2)
    alpha = [bpmm.alpha[k] + sum_S[k] for k in range(K)]
    gamDists = [] # Vector{Gam}()
    XS = X*(S.T);
    for k in range(K):
        a = [float(bpmm.cmp[k].a[d] + XS[d,k]) for d range(D)]
        b = bpmm.cmp[k].b + sum_S[k]
        gamDists.append(Gamma(a=a, b=b))
    return BayesPoiMixModel(D=D, K=K, alpha=alpha, gamDists)


def remove_stats(bpmm: BayesPoiMixModel, X, S): # X: Matrix{Float64}, S: Matrix{Float64}
    return add_stats(bpmm, X, -S)


# Pick single states having a max probability.
def winner_takes_all(S): # S: Matrix{Float64}
    S_ret = np.zeros(S.shape) #  np.matrix(np.zeros(size(S)))
    for n in range(S_ret.shape[1])
        idx = np.argmax(S[:,n])
        S_ret[idx,n] = 1
    return S_ret

In [None]:
# Sample a PMM given hyperparameters.
def sample_PMM(bpmm: BayesPoiMixModel) -> PoiMixModel:
    pois: List[Poisson] = [] 
    for c in bpmm.gamma_dists:
        lambda_: List[float] = []
        for d in range(bpmm.D):
            lambda_.append(rand(Gamma(c.a[d], 1.0/c.b)))
        pois.append(Poisson(lambda_ = lambda_)) 
    return PoiMixModel(
        D = bpmm.D 
        , K = bpmm.K 
        , phi = rand(Dirichlet(bpmm.alpha))
        , pois
    )

# Sample data from a specific PMM model.
def sample_data(pmm: PoiMixModel, N: int):
    X = np.zeros(pmm.D, N) #np.matrix(np.zeros(pmm.D, N))
    S = categorical_sample(pmm.phi, N)
    for n in range(N):
        k = np.argmax(S[:, n])
        for d in range(1, pmm.D)
            X[d,n] = rand(Poisson(pmm.cmp[k].lambda_[d]))
    return X, S

#categorical_sample(p::Vector{Float64}) = categorical_sample(p, 1)[:,1]

def categorical_sample(p: List[float], N: int = 1):
    K = length(p)
    S = np.zeros(K, N) # np.matrix(np.zeros(K, N))
    S_tmp = rand(Categorical(p), N)
    for k in range(K):
        S[k,find(S_tmp.==k)] = 1
    return S if N != 1 else S[:, 1]


def init_S(X, bpmm: BayesPoiMixModel): # X: Matrix{float}
    N = X.shape()[1]
    S = categorical_sample(np.ones(K)/bpmm.cluster, N)    
    return S

In [None]:
# used for Gibbs Sampling
def sample_S_GS(pmm: PoiMixModel, X):  # X: Matrix{Float64}
    D, N = X.shape
    K = pmm.K
    S = zeros(K, N)

    tmp = [-sum(pmm.cmp[k].lambda) + log.(pmm.phi[k]) for k in range(K)]
    ln_lambda_X = [X.T*np.log(pmm.cmp[k].lambda) for k in range(K)]
    for n in 1 : N
        tmp_ln_phi = [(tmp[k] + ln_lambda_X[k][n])::Float64 for k in range(K)]
        tmp_ln_phi = tmp_ln_phi - logsumexp(tmp_ln_phi)
        S[:,n] = categorical_sample(exp.(tmp_ln_phi))
    return S


# used for Collapsed Gibbs Sampling
def calc_ln_NB(Xn, gam: Gamma): # Xn: Vector{Float64}
    return sum([
        float(gam.a[d]*np.log(gam.b)
            - np.lgamma(gam.a[d])
            + np.lgamma(Xn[d] + gam.a[d])
            - (Xn[d] + gam.a[d])*np.log(gam.b + 1)
        ) for d in range(Xn.shape[0])
    ])


def sample_Sn(Xn, bpmm: BayesPoiMixModel): # Xn: Vector{Float64}
    ln_tmp = [(calc_ln_NB(Xn, bpmm.cmp[k]) + np.log(bpmm.alpha[k])) for k in range(bpmm.K)]
    ln_tmp = ln_tmp -  logsumexp(ln_tmp)
    Sn = categorical_sample(np.exp(ln_tmp))
    return Sn


def sample_S_CGS(S, M, bpmm: BayesPoiMixModel):  # S: Matrix{Float64}, X: Matrix{Float64}
    D, N = X.shape
    K = S.shape[0]
    for n in randperm(N)
        bpmm = remove_stats(bpmm, X[:,[n]], S[:,[n]])  # remove
        S[:,n] = sample_Sn(X[:,n], bpmm)  # sample
        bpmm = add_stats(bpmm, X[:,[n]], S[:,[n]])  # insert
    return S, bpmm

In [None]:
# Algorithm main

# Compute posterior distribution via variational inference.
def learn_VI(x, prior_bpmm: BayesPoiMixModel, max_iter: int):  # X: Matrix{Float64}
    # initialisation
    expt_S = init_S(X, prior_bpmm)
    bpmm = add_stats(prior_bpmm, X, expt_S)
    history = np.zeros(max_iter)

    # inference
    for i in range(max_iter)
        # E-step
        expt_S = np.exp(update_S(bpmm, X))
        # M-step
        bpmm = add_stats(prior_bpmm, X, expt_S)
        # calc VB
        history[i] = calc_ELBO(X, prior_bpmm, bpmm)

    return expt_S, bpmm, history


# Compute posterior distribution via Gibbs sampling.
def learn_GS(x, prior_bpmm: BayesPoiMixModel, max_iter: int=1000):  # X::Matrix{Float64}
    # initialisation
    S = init_S(X, prior_bpmm)
    bpmm = add_stats(prior_bpmm, X, S)
    history = np.zeros(max_iter)  # * NaN # なぜ？
    
    # inference
    for i in range(max_iter):
        # sample parameters
        pmm = sample_PMM(bpmm)
        # sample latent variables
        S = sample_S_GS(pmm, X)
        # update current model
        bpmm = add_stats(prior_bpmm, X, S)
        # calc VB
        history[i] = calc_ELBO(X, prior_bpmm, bpmm)

    return S, bpmm, history


# Compute posterior distribution via collapsed Gibbs sampling.
def learn_CGS(x, prior_bpmm: BayesPoiMixModel, max_iter: int=1000):  # X::Matrix{Float64}
    # initialisation
    S = init_S(X, prior_bpmm)
    bpmm = add_stats(prior_bpmm, X, S)
    VB = np.zeros(max_iter)  # * NaN # なぜ？
    # inference
    for i in range(max_iter):
        # directly sample S
        S, bpmm = sample_S_CGS(S, X, bpmm)
        # calc VB
        history[i] = calc_ELBO(X, prior_bpmm, bpmm)
    return S, bpmm, history


In [None]:
def draw_hist(ax, X, S, label):
    counts, bins, patches = ax.hist(X.T, 20)
    for i in range(len(patches)):
        if counts[i] > 0:
            S_tmp = S[:,bins[i] .<= X[1,:] .<= bins[i+1]]
            S_sum = sum(S_tmp, 2) / sum(S_tmp)
            patches[i].set_facecolor((S_sum[1], 0, S_sum[2]))
    ax.set_title(label)


# Visualize data & estimation using 1D histogram.
# X::Matrix{Float64}, S::Matrix{Float64}, S_est::Matrix{Float64}
def visualize(X, S, S_est):
    # separated figures
    fig = plt.figure(figsize=(12, 7))
    ax1 = fig.add_subplots(1,1,1)#="observation")
    ax2 = fig.add_subplots(1,1,2)#num="estimation")
    #f1[:clf]()
    f#2[:clf]()
    #_, ax1 = subplots(1,1,num="observation")
    #_, ax2 = subplots(1,1,num="estimation")
    ax1.hist(X.T, 20)
    ax1.set_title("observation")
    draw_hist(ax2, X, S_est, "estimation")    

# Run a test script for 1D data clustering.
def run():
    ## set model
    num_dim = 1  # data dimension, must be 1.
    num_cluster = 2  #  number of mixture components, must be 2.
    alpha = 100.0 * np.ones(K)
    bpmm = BayesPoiMixModel(
        num_dim=num_dim, 
        num_cluster=num_cluster, 
        alpha=alpha, 
        gamLists=[Gamma(a=1.0*np.ones(num_dim), b=0.01) for i in range(K)]
    )
    
    ## generate data
    N = 1000
    pmm = sample_PMM(bpmm)
    X, S = sample_data(pmm, N)
    
    ## inference
    max_iter = 100
    results = {
        "VI": learn_VI(X, bpmm),
        "GS": learn_GS(X, bpmm),
        "CGS": learn_CGS(X, bpmm),
        S_est, post_bpmm, VB = 
    }

    ## plot
    visualize_1D(X, S, S_est)

run()