In [1]:
import pymc as pm
import aesara.tensor as at
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from numpy.random import default_rng

RANDOM_SEED = 31415
rng = default_rng(RANDOM_SEED)

In [2]:
eda = pd.read_csv('data/features_4_2/eda/2.csv').to_numpy()
eda = eda.T

hr = pd.read_csv('data/features_4_2/hr/2.csv').to_numpy()
hr = hr.T

pupil = pd.read_csv('data/features_4_2/pupil/2.csv').to_numpy()
pupil = pupil.T

df_ = pd.read_csv('data/LookAtMe_002.csv', sep='\t')
label = np.array(list([int(d>2) for d in df_['rating']]))
E = label[:,np.newaxis]
E = np.transpose(E)

In [4]:
N = eda.shape[1]
d_eda = eda.shape[0]
d_hr = hr.shape[0]
d_e = E.shape[0]
d_pupil = pupil.shape[0]

print(N, d_eda, d_hr, d_e, d_pupil)

160 24 24 1 5


In [5]:
k = 3
coords = {"latent_columns": np.arange(k),
          "rows": np.arange(N),
          "observed_eda": np.arange(d_eda),
          "observed_label":np.arange(d_e),
          "observed_hr":np.arange(d_hr),
          "observed_pupil":np.arange(d_pupil)}

In [6]:
def expand_packed_block_triangular(d, k, packed, diag=None, mtype="aesara"):
    # like expand_packed_triangular, but with d > k.
    assert mtype in {"aesara", "numpy"}
    assert d >= k

    def set_(M, i_, v_):
        if mtype == "aesara":
            return at.set_subtensor(M[i_], v_)
        M[i_] = v_
        return M

    out = at.zeros((d, k), dtype=float) if mtype == "aesara" else np.zeros((d, k), dtype=float)
    if diag is None:
        idxs = np.tril_indices(d, m=k)
        out = set_(out, idxs, packed)
    else:
        idxs = np.tril_indices(d, k=-1, m=k)
        out = set_(out, idxs, packed)
        idxs = (np.arange(k), np.arange(k))
        out = set_(out, idxs, diag)
    return out

In [7]:
def makeW(d, k, dim_names,name):
    # make a W matrix adapted to the data shape
    n_od = int(k * d - k * (k - 1) / 2 - k)
    # trick: the cumulative sum of z will be positive increasing
    z = pm.HalfNormal("W_z_"+name, 1.0, dims="latent_columns")
    b = pm.HalfNormal("W_b_"+name, 1.0, shape=(n_od,), dims="packed_dim")
    L = expand_packed_block_triangular(d, k, b, at.ones(k))
    W = pm.Deterministic(name, at.dot(L, at.diag(at.extra_ops.cumsum(z))), dims=dim_names)
    return W


In [8]:
with pm.Model(coords=coords) as PPCA_identified:
    W_eda = makeW(d_eda, k, ("observed_eda", "latent_columns"),'W_eda')
    W_hr = makeW(d_hr, k, ("observed_hr", "latent_columns"),'W_hr')
    W_pupil = pm.Normal("W_pupil", dims=("observed_pupil", "latent_columns"))

    W_e = pm.Normal("W_e", dims=("observed_label", "latent_columns"))
    C = pm.Normal("C", dims=("latent_columns", "rows"))
    psi_eda = pm.HalfNormal("psi_eda", 1.0)
    X_eda = pm.Normal("X_eda", mu=at.dot(W_eda, C), sigma=psi_eda, observed=eda, dims=("observed_eda", "rows"))

    psi_hr = pm.HalfNormal("psi_hr", 1.0)
    X_hr = pm.Normal("X_hr", mu=at.dot(W_hr, C), sigma=psi_hr, observed=hr, dims=("observed_hr", "rows"))

    psi_pupil = pm.HalfNormal("psi_pupil", 1.0)
    X_pupil = pm.Normal("X_pupil", mu=at.dot(W_pupil, C), sigma=psi_pupil, observed=pupil, dims=("observed_pupil", "rows"))

    X_e = pm.Bernoulli("X_e", p=pm.math.sigmoid(at.dot(W_e, C)), dims=("observed_label", "rows"), observed=E)

In [9]:
gv = pm.model_to_graphviz(PPCA_identified)
gv.view('PPCA example')

'PPCA example.pdf'

In [None]:
with PPCA_identified:
    approx = pm.fit(30000, callbacks=[pm.callbacks.CheckParametersConvergence(tolerance=1e-4)])
    trace = approx.sample(500)

In [None]:
with PPCA_identified:
    posterior_predictive = pm.sample_posterior_predictive(
        trace, var_names=["X_e"], random_seed=123)