# Inverted encoding models, revisited

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pymc3 as pm
from scipy.stats import pearsonr
from sklearn.base import RegressorMixin, BaseEstimator
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import OneHotEncoder
import seaborn as sns
from scipy.ndimage import gaussian_filter
from scipy.linalg import toeplitz
from sklearn.discriminant_analysis import _cov, LinearDiscriminantAnalysis
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from scipy import stats
import pymc3 as pm
%matplotlib inline

In [2]:
class Iem(BaseEstimator, RegressorMixin):
    """ Base class for Inverted Encoding Model. """

    def __init__(self, classify=True, score_func=None):
        """ Initializes base class. """
        self.W = None  # estimated parameters
        self.classify = classify

    def _preproc(self, S):
        """ Preprocesses stimulus features (S). """
        if self.classify and S.ndim == 1:
            S = OneHotEncoder(sparse=False).fit_transform(S[:, np.newaxis])
        elif not self.classify and S.ndim == 1:
            S = np.c_[np.ones(S.shape[0]), S]

        return S
    
    def _classify(self, S_pred):
        """ Makes predictions categorical. """
        return np.argmax(S_pred, axis=0)

    def fit(self, R, S):
        """ Fits model (should be defined in child class). """
        raise NotImplementedError

    def predict(self, R):
        """ Predicts new stimuli based on responses
        (should be defined in child class). """
        raise NotImplementedError

    def score(self, R, S):
        """ Scores prediction. """
        S_pred = self.predict(R)
        
        if self.classify:
            return np.mean(S_pred == S)

class OlsIem(Iem):

    def fit(self, R, S):
        
        S = self._preproc(S)
        self.W = np.linalg.inv(S.T @ S) @ S.T @ R
        return self

    def predict(self, R):
        S_pred = np.linalg.pinv(self.W @ self.W.T) @ self.W @ R.T
        
        if self.classify:
            S_pred = self._classify(S_pred)

        return S_pred


class WlsIem(Iem):
    
    def fit(self, R, S):
        
        S = self._preproc(S)
        self.W = np.linalg.inv(S.T @ S) @ S.T @ R
        resids = R - S @ self.W
        var_err = np.var(resids, axis=0)
        omega = np.eye(resids.shape[1])
        np.fill_diagonal(omega, var_err)
        self.omega = np.linalg.inv(omega)
        return self

    def predict(self, R):
        W, omega = self.W, self.omega
        S_pred = np.linalg.pinv(W @ omega @ W.T) @ W @ omega @ R.T

        if self.classify:
            S_pred = self._classify(S_pred)

        return S_pred

    
class GlsIem(Iem):
    
    def __init__(self, shrink_cov='auto', classify=True):
        self.shrink_cov = shrink_cov
        super().__init__(classify=classify)
    
    def fit(self, R, S):
        
        S = self._preproc(S)
        self.W = np.linalg.inv(S.T @ S) @ S.T @ R
        resids = R - S @ self.W
        cov_err = _cov(resids, shrinkage=self.shrink_cov)
        self.omega = np.linalg.inv(cov_err)

        return self

    def predict(self, R):
        W, omega = self.W, self.omega
        S_pred = np.linalg.pinv(W @ omega @ W.T) @ W @ omega @ R.T

        if self.classify:
            S_pred = self._classify(S_pred)

        return S_pred


class RidgeGlsIem(GlsIem):
    
    def __init__(self, alpha=1, classify=True, shrink_cov='auto'):
        self.alpha = alpha
        super().__init__(classify=classify, shrink_cov=shrink_cov)   

    def predict(self, R):
        
        W, omega = self.W, self.omega
        S_pred = (np.linalg.pinv(W @ omega @ W.T) + self.alpha*np.eye(W.shape[0])) @ W @ omega @ R.T

        if self.classify:
            S_pred = self._classify(S_pred)
        
        return S_pred

In [3]:
class DataGenerator:

    def __init__(self, categorical=True, N=100, P=4, K=50, sig_sq=1,
                 rho=0.9, max_var=10, noise_model='ols', param_model='unif'):

        self.categorical = categorical
        self.N = N
        self.P = P
        self.K = K
        self.sig_sq = sig_sq
        self.rho = rho  # ar1 param
        self.max_var = max_var
        self.noise_model = noise_model
        self.param_model = param_model

    def generate(self):

        N, P, K = self.N, self.P, self.K
        S = self._generate_design()
        eps = self._generate_noise()
        W = self._generate_params()
        R = S.dot(W) + eps

        if self.categorical:
            S = np.argmax(S, axis=1)

        return S, R

    def _generate_design(self):
        
        N, P = self.N, self.P
        if self.categorical:
            S_tmp = np.repeat(np.arange(P), N / P)[:, np.newaxis]
            S = OneHotEncoder(sparse=False).fit_transform(S_tmp)
        else:
            S = np.random.normal(0, 1, size=(N, P))

        return S

    def _generate_noise(self):
        
        N, K = self.N, self.K
        noise_mu = np.zeros(K)

        if self.noise_model == 'ols':
            noise_cov = np.identity(K)
        elif self.noise_model in ['wls', 'gls', 'wgls']:

            if self.noise_model == 'gls':
                # assuming equal variance, but with non-zero covariance
                noise_cov = self.rho ** toeplitz(np.arange(K))
            else:
                varz = np.random.uniform(0, self.max_var, size=K)
                if self.noise_model == 'wls':
                    noise_cov = np.diag(varz)
                else:    
                    corr_cov = self.rho ** toeplitz(np.arange(K))
                    varz = varz[:, np.newaxis]
                    noise_cov = np.sqrt(varz.dot(varz.T))
                    noise_cov *= corr_cov

        noise = np.random.multivariate_normal(noise_mu, self.sig_sq*noise_cov, size=N)
        return noise
    
    def _generate_params(self):

        P, K = self.P, self.K

        params_mu = np.zeros(P)
        if self.param_model == 'unif':
            W = np.random.uniform(-.5, .5, size=(P, K))
        elif self.param_model == 'ols':
            params_cov = np.identity(P) / 10
            W = np.random.multivariate_normal(params_mu, params_cov, size=K).T
        elif self.param_model == 'gls':
            params_cov = 0.5 ** toeplitz(np.arange(P))
            W = np.random.multivariate_normal(params_mu, params_cov, size=K).T
        elif self.param_model == 'wls':    
            varz = np.random.uniform(0, 1, size=P)
            params_cov = np.diag(varz)
            W = np.random.multivariate_normal(params_mu, params_cov, size=K).T
        elif self.param_model == 'wgls':
            varz = np.random.uniform(0, 1, size=P)[:, np.newaxis]
            params_cov = np.sqrt(varz.dot(varz.T))
            params_cov *= 0.5 ** toeplitz(np.arange(P))
            W = np.random.multivariate_normal(params_mu, params_cov, size=K).T
            
        return W
    
for categorical in [True, False]:
    # print("categorical: %s" % categorical)
    for noise_model in ['ols', 'wls', 'gls', 'wgls']:
        # print('\t noise_model: %s' % noise_model)
        for param_model in ['unif', 'ols', 'wls', 'gls', 'wgls']:
            # print('\t\t param_model: %s' % param_model)
            dgn = DataGenerator(categorical=categorical, N=100, P=2, K=50,
                                sig_sq=1, noise_model=noise_model, param_model=param_model)
            S, R = dgn.generate()       

In [None]:
N = 200
P = 2
K = 100
sig_sq = 10

iters = 500
fig, axes = plt.subplots(ncols=4, figsize=(20, 5), sharex=True, sharey=True)
clfs = [OlsIem(), WlsIem(), GlsIem(), GaussianNB(), LinearDiscriminantAnalysis(shrinkage='auto', solver='lsqr')]
for i, noise_model in enumerate(['ols', 'wls', 'gls', 'wgls']):
    scores = np.zeros((iters, len(clfs)))
    
    for ii in range(iters):

        S, R = DataGenerator(categorical=True, N=N, P=P, K=K, sig_sq=sig_sq,
                             noise_model=noise_model).generate()
        for iii, clf in enumerate(clfs):
            scores[ii, iii] = cross_val_score(estimator=clf, X=R, y=S, cv=10).mean()

    for ii in range(scores.shape[1]):
        sns.distplot(scores[:, ii], ax=axes[i], hist=False, label=clfs[ii].__class__.__name__,
                     kde_kws={'lw': 4})

    axes[i].set_title('Noise model: %s' % noise_model)

sns.despine()
fig.tight_layout()
fig.show()