In [None]:
!sh get_data.sh

In [20]:
import numpy as np
from scipy.special import factorial
from tqdm import tqdm_notebook
import matplotlib.pyplot as plt
from load_data import get_data

In [21]:
from buffet import IndianBuffet
np.random.seed(100)

def init_ibm(data):
    Xs = len(data)
    return IndianBuffet(data, (1, 1, 1), [(1.7, 1, 1)] * Xs, [(0.5, 1, 1)] * Xs)

In [22]:
X_train, X_test = get_data(include_labels=True)

(64, 196)
(32, 196)


In [23]:
def predict(model, Xs):
    best = (-float('inf'), None, None)
    for Z in model.Zs:
        m = Z.sum(axis=0) - Z
        lps = np.sum(np.log(np.where(Z == 1, m, model.N - m) / model.N), axis=1)
        for f, X in zip(model.feats[:-1], Xs[:-1]):
            if X is not None:
                X_bar = Z @ f.weights(Z)
                diffs = np.sum((X - X_bar) ** 2, axis=1) / (2 * f.var_x)
                lps -= diffs + np.log(2 * np.pi) / 2 + np.log(f.var_x) * f.D / 2
        i = np.argmax(lps)
        if lps[i] > best[0]:
            best = lps[i], Z, i
    best_lp, best_Z, best_i = best
    W = model.feats[-1].weights(best_Z)
    X_w_pred = best_Z[best_i] @ W
    print(X_w_pred, Xs[-1])
#     print(np.argmax(X_w_pred) == np.argmax(Xs[-1]))
    return Xs[-1][np.argmax(X_w_pred)] == 1

In [40]:
def get_X_types(X):
    N = len(X[0])
    return {
        'multi': X,
        'vision_uni': [X[0], [None] * N, X[2]],
        'audio_uni': [[None] * N, X[1], X[2]],
        'vision': [X[0], X[2]],
        'audio': [X[1], X[2]],
    }

def run_experiment(X_train, X_test, seed=None):
    N_train = len(X_train[0])
    N_test = len(X_test[0])
    trains = get_X_types(X_train)
    tests = get_X_types(X_test)
    
    train_test_map = {
        'vision': ['vision'],
        'audio': ['audio'],
        'multi': ['multi', 'vision_uni', 'audio_uni']
    }
    
    for train_type in train_test_map.keys():
        np.random.seed(seed)
        print(f'Training {train_type}...')
        ibm = init_ibm(trains[train_type])
        ibm.run_sampler(iters=5000)
        
        for test_type in train_test_map[train_type]:
            train_predictions = [predict(ibm, xs) for xs in zip(*trains[test_type])]
            test_predictions = [predict(ibm, xs) for xs in zip(*tests[test_type])]
            
            train_acc = sum(train_predictions) / N_train
            test_acc = sum(test_predictions) / N_test
            
            print(f'TRAIN: {train_type}, TEST: {test_type}', f'train accuracy: {train_acc}', f'test_accuracy: {test_acc}', sep='\n')

In [None]:
run_experiment(X_train, X_test, seed=4)

Training vision...


HBox(children=(IntProgress(value=0, max=5000), HTML(value='')))

  lpz = np.log(np.stack((self.N - mi, mi)))
  ll = self.D * np.log(sigma) + np.power(self.X[i] - mu, 2).sum() / sigma
