In [1]:
%%file test_optimize_vp.py

import numpy as np
from scipy.special import digamma, polygamma

text_ = [np.array([0., 3., 2., 1., 4., 2., 3., 0., 5.]),
         np.array([ 5., 11.,  9., 12.,  8.,  1.,  6.,  7., 10.]),
         np.array([16., 15., 20.,  8., 18., 14., 17., 21., 13., 19.]),
         np.array([25., 23., 19., 26., 29., 27.,  5., 24., 28.,  8.,  1., 22.]),
         np.array([16., 30., 31.,  0.,  3., 16.])]
np.random.seed(64528)
M = 5
k = 4
N = np.array(list(map(len, text_)))
V = 32
V_words = range(V)
alpha = np.random.dirichlet(10*np.ones(k),1)[0]
beta = np.random.dirichlet(np.ones(V),k)

phi = np.array([1/k*np.ones([N[m],k]) for m in range(M)])
gamma = np.tile(alpha,(M,1)) + np.tile(N/k,(k,1)).T


def test_optimize_vp():
    K = k
    words = text_
    
    for t in range(10):
        phi_old = phi
        gamma_old = gamma
        #update phi
        for m in range(M):
            for n in range(N[m]):
                for i in range(K):
                    phi[m][n,i] = beta[i,np.int(words[m][n])] * np.exp(digamma(gamma[m,i]))
                #nomalize to 1)
                phi[m][n,:] = phi[m][n,:]/np.sum(phi[m][n,:])
        phi_new = phi
        #update gamma
        for i in range(M):
            gamma[i,:]  = alpha + np.sum(phi[i], axis = 0)
        gamma_new = gamma

        assert np.min(list(map(np.min, phi)))>0
        assert all(np.sum(phi[1], axis = 1))==1
        assert np.min(gamma)>0



Overwriting test_optimize_vp.py


In [2]:
%%file test_alpha.py

import numpy as np
from scipy.special import digamma, polygamma

text_ = [np.array([0., 3., 2., 1., 4., 2., 3., 0., 5.]),
         np.array([ 5., 11.,  9., 12.,  8.,  1.,  6.,  7., 10.]),
         np.array([16., 15., 20.,  8., 18., 14., 17., 21., 13., 19.]),
         np.array([25., 23., 19., 26., 29., 27.,  5., 24., 28.,  8.,  1., 22.]),
         np.array([16., 30., 31.,  0.,  3., 16.])]
np.random.seed(64528)
M = 5
k = 4
N = np.array(list(map(len, text_)))
alpha = np.random.dirichlet(10*np.ones(k),1)[0]
phi = np.array([1/k*np.ones([N[m],k]) for m in range(M)])
gamma = np.tile(alpha,(M,1)) + np.tile(N/k,(k,1)).T

def test_alpha():
    K = k
    alpha = np.random.dirichlet(10*np.ones(k),1)[0]
    for t in range(10):
        alpha_old = alpha
        
        g = np.zeros(K)
        h = np.zeros(K)
        for i in range(K):
            g1 = M*(digamma(np.sum(alpha))-digamma(alpha[i]))
            g2 = 0
            for d in range(M):
                g2 += digamma(gamma[d,i])-digamma(np.sum(gamma[d,:]))
            g[i] = g1 + g2
            
            h[i] = -M*polygamma(1, alpha[i])
        
        z = M*polygamma(1, np.sum(alpha))
        c = (np.sum(g/h))/(z**(-1) + np.sum(h**(-1)))
                   
        alpha -= (g-c)/h
            
        assert np.min(alpha)>0



Overwriting test_alpha.py


In [3]:
%%file test_beta.py

import numpy as np
from scipy.special import digamma, polygamma

text_ = [np.array([0., 3., 2., 1., 4., 2., 3., 0., 5.]),
         np.array([ 5., 11.,  9., 12.,  8.,  1.,  6.,  7., 10.]),
         np.array([16., 15., 20.,  8., 18., 14., 17., 21., 13., 19.]),
         np.array([25., 23., 19., 26., 29., 27.,  5., 24., 28.,  8.,  1., 22.]),
         np.array([16., 30., 31.,  0.,  3., 16.])]
np.random.seed(64528)
M = 5
k = 4
N = np.array(list(map(len, text_)))
V = 32
V_words = range(V)
alpha = np.random.dirichlet(10*np.ones(k),1)[0]
beta = np.random.dirichlet(np.ones(V),k)

phi = np.array([1/k*np.ones([N[m],k]) for m in range(M)])
gamma = np.tile(alpha,(M,1)) + np.tile(N/k,(k,1)).T


def test_beta():
    K = k
    D = text_
    V = len(V_words)
    beta = np.ones((K,V))
    # first obtain the propotion values
    for j in range(V):
        word = V_words[j]
        # give a TRUE or FALSE "matrix", remember w_mnj should have the same shape with phi
        w_mnj = [np.repeat(w==word, K).reshape((len(w),K)) for w in D]
        # compute the inner sum over number of words
        sum1 = list(map(lambda x: np.sum(x,axis=0),phi*w_mnj))
        # compute the outer sum over documents
        beta[:,j] = np.sum(np.array(sum1), axis = 0)
    
    # then normalize each row s.t. the row sum is one
    for i in range(K):
        beta[i,:] = beta[i,:]/sum(beta[i,:])
        
    assert np.min(beta)>0 
    assert all(np.sum(beta, axis = 1)==1)

Overwriting test_beta.py


In [4]:
%%file test_converge1.py

import numpy as np
from scipy.special import digamma, polygamma

N = np.array([ 9,  9, 10, 12,  6])
M = 5
k = 4

phi = np.array([1/k*np.ones([N[m],k]) for m in range(M)])

def test_converge1():
    tol = 10**(-2)
    
    loss = np.sqrt(list(map(np.sum,np.square(phi - phi))))
    assert np.max(loss) <= tol


Overwriting test_converge1.py


In [5]:
%%file test_converge2.py

import numpy as np
from scipy.special import digamma, polygamma

N = np.array([ 9,  9, 10, 12,  6])
M = 5
k = 4
V = 32
alpha = np.random.dirichlet(10*np.ones(k),1)[0]
beta = np.random.dirichlet(np.ones(V),k)
gamma = np.tile(alpha,(M,1)) + np.tile(N/k,(k,1)).T

def test_converge2():
    tol = 10**(-2)
    
    loss1 = np.sqrt(list(map(np.sum,np.square(beta - beta))))
    loss2 = np.sqrt(list(map(np.sum,np.square(gamma - gamma))))
    assert np.max(loss1) <= tol and np.max(loss2) <= tol

Overwriting test_converge2.py


In [6]:
! pytest

platform linux -- Python 3.6.5, pytest-3.5.1, py-1.5.3, pluggy-0.6.0
rootdir: /home/jovyan/work/latent-dirichlet-allocation, inifile:
collected 5 items                                                              [0m[1m[1m

test_alpha.py .[36m                                                          [ 20%][0m
test_beta.py .[36m                                                           [ 40%][0m
test_converge1.py .[36m                                                      [ 60%][0m
test_converge2.py .[36m                                                      [ 80%][0m
test_optimize_vp.py .[36m                                                    [100%][0m

