_Neural Data Science_

Lecturer: Dr. Jan Lause, Prof. Dr. Philipp Berens

Tutors: Jonas Beck, Fabio Seel, Julius Würzler

Summer term 2025

Student names: Nina Lutz, Mathis Nommensen

LLM Disclaimer: We used ChatGPT for trouble shooting and ...?

# Coding Lab 6

In this exercise we are going to fit a latent variable model (Poisson GPFA) to both toy data and real data from monkey primary visual cortex. For details, see [Ecker et al. 2014](https://www.cell.com/neuron/pdf/S0896-6273(14)00104-4.pdf).

## Preliminaries

### 1. Code 

The toolbox we are going to use contains an implementation of the EM algorithm to fit the poisson-gpfa. 

Assuming you `git clone https://github.com/berenslab/poisson-gpfa` to the parent directory and have the following directory structure:


```
├── data/
│   └── nds_cl_6_data.mat
├── poisson-gpfa/
├── notebooks
│   └── CodingLab6.ipynb
├── matplotlib_style.txt
├── requirements.txt
```

then you can import the related functions via:

```
import sys
sys.path.append('../poisson-gpfa/')
sys.path.append('../poisson-gpfa/funs')

import funs.util as util
import funs.engine as engine
```

Change the paths if you have a different directory structure. For the details of the algorithm, please refer to the thesis `hooram_thesis.pdf` from ILIAS.

### 2. Data

Download the data file ```nds_cl_6_data.mat``` from ILIAS and save it in a ```data/``` folder.

In [None]:
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt

# style
import seaborn as sns

# poisson-gpfa
import sys

sys.path.append("../poisson-gpfa/")
sys.path.append("../poisson-gpfa/funs")


import funs.util as util
import funs.engine as engine

%matplotlib inline

%load_ext jupyter_black

%load_ext watermark
%watermark --time --date --timezone --updated --python --iversions --watermark -p sklearn

In [None]:
plt.style.use("../matplotlib_style.txt")

## Task 1. Generate some toy data to test the poisson-GPFA code

We start by verifying our code on toy data. The cell below contains code to generate data for 30 neurons, 100 trials (1000 ms each) and 50ms bin size. The neurons' firing rate $\lambda_k$ is assumed to be a constant $d_k$ modulated by a one-dimensional latent state $x$, which is drawn from a Gaussian process:

$\lambda_k = \exp(c_kx + d_k)$

Each neuron's weight $c_k$ is drawn randomly from a normal distribution and spike counts are sampled form a Poisson distribution with rate $\lambda_k$.

Your task is to fit a Poisson GPFA model with one latent variable to this data (see `engine.PPGPFAfit`).

Hint: You can use `util.dataset?`, `engine.PPGPFAfit?` or `util.initializeParams?` to find out more about the provided package.

*Grading: 3 pts*

In [None]:
# ---------------------------------
# simulate a training set (0.5 pts)
# ---------------------------------

# Initialize random number generator
np.random.seed(123)

# Specify dataset & fitting parameters
xdim = 1
ydim = 30
numTrials = 100
trialDur = 1000  # in ms
binSize = 50  # in ms
maxEMiter = 100
dOffset = 1  # controls firing rate

# Sample from the model (make a toy dataset)
training_set = util.dataset(
    seed=np.random.randint(10000),
    xdim=xdim,
    ydim=ydim,
    numTrials=numTrials,
    trialDur=trialDur,
    binSize=binSize,
    dOffset=dOffset,
    fixTau=True,
    fixedTau=np.linspace(0.1, 0.5, xdim),
    drawSameX=True,
)

### Fit the model

In [None]:
# -----------------------
# fit the model (0.5 pts)
# -----------------------

# Initialize parameters using Poisson-PCA
initParams = util.initializeParams(xdim, ydim, training_set)

# choose sensible parameters and run fit
fitToy = engine.PPGPFAfit(
    experiment=training_set,
    initParams=initParams,
    inferenceMethod="laplace",
    EMmode="Batch",  # using vanilla (batch) EM. for online EM use "Online"
    maxEMiter=maxEMiter,
)

In [None]:
# some useful functions
def allTrialsState(fit: engine.PPGPFAfit, p) -> np.ndarray:
    """Reshape the latent signal and the spike counts"""
    x = np.zeros([p, 0])
    for i in range(len(fit.infRes["post_mean"])):
        x = np.concatenate((x, fit.infRes["post_mean"][i]), axis=1)
    return x


def allTrialsX(training_set: util.dataset) -> np.ndarray:
    """Reshape the ground truth
    latent signal and the spike counts"""
    x_gt = np.array([])
    for i in range(len(training_set.data)):
        x_gt = np.concatenate((x_gt, training_set.data[i]["X"][0]), axis=0)
    return x_gt

### Plot the ground truth vs. inferred model
Verify your fit by plotting both ground truth and inferred parameters for:
1. weights C
2. biases d
3. latent state x 

Note that the sign of fitted latent state and its weights are ambiguous (you can flip both without changing the model). Make sure you correct the sign for the plot if it does not match the ground truth.

In [None]:
# All trials latent state vector
x_est = allTrialsState(fitToy, 1)
x_true = allTrialsX(training_set)

In [None]:
# ------------------------------------------------------
# Plot ground truth vs. inferred model
# Plot the weights `C`, biases `d` and latent states (2 pts)
# ------------------------------------------------------


# add plot
fig, ax = plt.subplot_mosaic([["C", "d"], ["latent", "latent"]])
# For d & C consider also plotting the optimal weights as a dotted line for reference

# estimated params
C_est = fitToy.optimParams["C"]
d_est = fitToy.optimParams["d"]

# ground truth
C_true = training_set.params["C"]
d_true = training_set.params["d"]


ax["C"].plot(-C_est, label="Estimated")  # flipped sign for better comparison
ax["C"].plot(C_true, linestyle="--", label="True")
ax["C"].set_title("Estimated vs. True Weights")

ax["d"].plot(d_est)
ax["d"].plot(d_true, linestyle="--")
ax["d"].set_title("Estimated vs. True Biases")

# For the latent states consider seperating each trial by a vertical line
# plot only for a subset of trials
T = training_set.data[0]["Y"].shape[1]  # time bins per trial
x_est_subset = x_est[0][0 : 5 * T]
x_true_subset = x_true[0 : 5 * T]

ax["latent"].plot(-x_est_subset)  # flipped sign for better comparison
ax["latent"].plot(x_true_subset, linestyle="--")
ax["latent"].set_title("Estimated vs. True Latent State")
ax["latent"].legend(
    handles=[
        plt.Line2D([], [], color="tab:orange", linestyle="--", label="True"),
        plt.Line2D([], [], color="tab:blue", label="Estimated"),
    ],
    loc="lower left",
    fontsize=9,
)

plt.tight_layout()

## Task 2: Fit GPFA model to real data. 

We now fit the model to real data and cross-validate over the dimensionality of the latent variable.

*Grading: 4 pts*



### Load data

The cell below implements loading the data and encapsulates it into a class that matches the interface of the Poisson GPFA engine. You don't need to do anything here.

In [None]:
class EckerDataset:
    """Loosy class"""

    def __init__(
        self,
        path: str,
        subject_id: int = 0,
        ydim: int = 55,
        trialDur: int = 2000,
        binSize: int = 100,
        numTrials: int = 100,
        ydimData: bool = False,
        numTrData: bool = True,
    ):
        # T = binSize #int(trialDur/binSize)
        T = int(trialDur / binSize)
        matdat = sio.loadmat(path)
        self.matdat = matdat
        data = []
        trial_durs = []
        for trial_id in range(numTrials):
            trial_time = matdat["spikeTimes"][:, trial_id][0]
            trial_big_time = np.min(trial_time)
            trial_end_time = np.max(trial_time)
            trial_durs.append(trial_end_time - trial_big_time)
        for trial_id in range(numTrials):
            Y = []
            spike_time = []
            data.append(
                {
                    "Y": matdat["spikeCounts"][:, :, trial_id],
                    "spike_time": matdat["spikeTimes"][:, trial_id],
                }
            )
        self.T = T
        self.trial_durs = trial_durs
        self.data = data
        self.trialDur = trialDur
        self.binSize = binSize
        self.numTrials = numTrials
        self.ydim = ydim
        util.dataset.getMeanAndVariance(self)
        util.dataset.getAvgFiringRate(self)
        util.dataset.getAllRaster(self)

In [None]:
path = "../data/nds_cl_6_data.mat"
data = EckerDataset(path)

### Fit Poisson GPFA models and perform model comparison

Split the data into 80 trials used for training and 20 trials held out for performing model comparison. On the training set, fit models using one to five latent variables. Compute the performance of each model on the held-out test set.

Hint: You can use the `crossValidation` function in the Poisson GPFA package.

Optional: The `crossValidation` function computes the sum of the squared errors (SSE) on the test set, which is not ideal. The predictive log-likelihood under the Poisson model would be a better measure, which you are welcome to compute instead.

### Derivation for log-likelihood

_You can add your calculations in_ $\LaTeX$ _here_.

$\lambda_k(x_t) = \exp(C x_t + d) = \exp(c_k x_t + d_k)$; $y_t \sim Poisson(\lambda_t)$; $p_\lambda(y) = \frac{\lambda^y\exp(-\lambda)}{y!}$

$p_\lambda(x_t) = p_\lambda(y_k \mid x_t) = \prod_{k=1}^K \frac{\exp\left( c_k x_t + d_k \right)^{y_{kt}} \cdot \exp\left( -\exp(c_k x_t + d_k) \right)}{y_{kt}!}$, whereas K = number of neurons


$L(\lambda_k; x_1, ..., x_N) = \prod_{t=1}^T p_\lambda(y_k \mid x_t)= \prod_{t=1}^T \prod_{k=1}^K \frac{\exp(c_k x_t + d_k)^{y_{kt}} \cdot e^{-\exp(c_k x_t + d_k)}}{y_{kt}!}$, whereas T = number of time bins per trial

$log(L) = l(\lambda_k; x_1, ..., x_N) = \sum_{t=1}^T \sum_{k=1}^K \log\left(\frac{\left[\exp(c_k x_t + d_k)\right]^{y_{kt}} \cdot e^{-\exp(c_k x_t + d_k)}}{y_{kt}!}\right) \\ = \sum_{t=1}^T \sum_{k=1}^K\left[y_{kt}(c_k x_t + d_k)- \exp(c_k x_t + d_k)- \log(y_{kt}!)\right]$

In [None]:
# ------------------------------
# Perfom cross validation (1 pt)
# ------------------------------

### SSE version (optional version could be done later)

# do the actual cross validation
# split: 80 training / 20 validation
numTrainingTrials = 80
numTestTrials = 20
latent_vars = 5

maxEMiter = 50  # number of EM iterations for each fold

xval = util.crossValidation(
    experiment=data,
    numTrainingTrials=numTrainingTrials,
    numTestTrials=numTestTrials,
    maxXdim=latent_vars,
    maxEMiter=maxEMiter,
)

### Plot the test error

Make a plot of the test error for the five different models. As a baseline, please also include the test error of a model without a latent variable. This is essentially the SSE of a constant rate model (or Poisson likelihood if you did the optional part above). Note: We assume a constant firing rate across trials, but not necessarily across time.

In [None]:
# --------------------------------------------------------------------------------
# Compute and plot the test errors for the different latent variable models (1 pt)
# --------------------------------------------------------------------------------

train_set, test_set = util.splitTrainingTestDataset(
    data, numTrainingTrials=80, numTestTrials=20
)
# Estimate mean firing rate per neuron per time bin from training set
mean_rate = np.zeros_like(test_set.data[0]["Y"], dtype=np.float64)
for trial in train_set.data:
    mean_rate += trial["Y"]
mean_rate /= len(train_set.data)

# Compute SSE over test set
baseline_error = 0
for trial in test_set.data:
    baseline_error += np.sum((trial["Y"] - mean_rate) ** 2)

# model errors
model_errors = xval.errs


# Plotting
plt.figure(figsize=(6, 4))

# Plot model errors (dims 1 to 5)
plt.plot(
    range(1, len(model_errors) + 1),
    model_errors,
    marker="o",
    label="Latent Variable Models",
)

# Plot baseline (horizontal line)
plt.hlines(
    baseline_error,
    1,
    len(model_errors),
    colors="red",
    linestyles="dashed",
    label="Baseline (No Latent Var)",
)

plt.xlabel("Latent Dimensionality")
plt.ylabel("Test Error (SSE)")
plt.title("Model Performance on Held-Out Test Set")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# -------------------------------------------------------------------------
# Compute and plot the test errors for the different latent variable models
# and answer the questions below (1+1 pts)
# -------------------------------------------------------------------------

# Your plot here
fig, ax = plt.subplots(figsize=(4, 3))

# plot model error
ax.plot(
    range(1, len(model_errors) + 1),
    model_errors,
    marker="o",
    label="Latent Variable Models",
)
# plot baseline
ax.axhline(
    baseline_error, linestyle="--", color="red", label="Baseline (No Latent Var)"
)

ax.set_xlabel("Latent Dimensionality")
ax.set_ylabel("Test Error (SSE)")
ax.set_title("Test Error vs. Latent Dimensionality")
ax.legend()
ax.grid(True)

plt.tight_layout()
plt.show()

### Questions

Is the baseline error (0 latents) about what you expect in relation to the other models? Why or why not?

* Yes, the baseline error is what we expect. It is higher than the errors from models with 1 to 5 latent variables, indicating that the latent variable models capture meaningful shared structure in the data but the constant-rate baseline cannot. The baseline assumes each neuron has a fixed firing rate (true, across trials), while latent variable models explain coordinated fluctuations across neurons, improving predictive performance on held-out data.

Can you identify a "best model". If so, which is it and what does this say about the structure of the latent state?

* The model with 4 latent variables looks like the "best model". It achieves a low test error and shows weaker improvement after that point. This suggests that the underlying structure of the neural data is low-dimensional and can be effectively captured with just four latent factors. These factors likely reflect shared variability among neurons due to common inputs, behavioral states, or network dynamics. Adding more dimensions provides only minimal gain, indicating that most of the meaningful structure is already explained by the first four latent variables.

## Task 3. Visualization: population rasters and latent state.
Use the model with a single latent state. 

Create a raster plot where you show for each trial the spikes of all neurons as well as the trajectory of the latent state `x` (take care of the correct time axis). Sort the neurons by their weights `c_k`. Plot only the first 20 trials.

*Grading: 2 pts*

In [None]:
from numpy import matlib

# get one fitted model from xdim 1 to 5
fit = xval.fits[0]  # for xdim=1
C = fit.optimParams["C"][:, 0]  # weights c_k for xdim=1, shape: (neurons, 1)

# sort neurons by their weights (increasing order)
neurons_sorted = np.argsort(C)

# Your plot here
fig, axs = plt.subplots(10, 2, figsize=(14, 14))

ts = np.linspace(50, 2000, 100)
xa = 0.15
xs = 0.7 * xa * np.sin(ts / 1000 * 3.4 * 2 * np.pi) + xa

fig.suptitle(
    "Raster Plot of Spikes Sorted by Neuron Weight ($c_k$) with Latent State Overlay",
    fontsize=16,
    y=1.02,  # adjusts vertical position if needed
)

with sns.axes_style("ticks"):
    for ntrial, ax in enumerate(axs.flat):
        x = range(0, 2000, 100)  # assume binsize of 100ms

        # --------------------------------------------------------
        # plot the raster for each neuron and latent state (2 pts)
        # --------------------------------------------------------

        # hint: can be plotted on top of the corresponding raster

        # sort neurons by weight (from lowest to highest)
        # sorted spike matrix per trial
        Y = data.data[ntrial]["Y"]
        Y_sorted = Y[neurons_sorted, :]

        # raster
        for n, neuron in enumerate(Y_sorted):
            spike_bins = np.where(neuron > 0)[0]
            for t_bin in spike_bins:
                ax.vlines(
                    x=t_bin * data.binSize,
                    ymin=n,
                    ymax=n + 0.8,
                    color="gray",
                    linewidth=0.5,
                )

        # plot the latent state trajectory
        x_latent = fit.infRes["post_mean"][ntrial][0]  # shape: (time bins,)
        x_scaled = (x_latent - np.mean(x_latent)) / np.std(x_latent)
        x_scaled = (x_scaled * 0.7 * xa + xa) * data.ydim
        ax.plot(x, x_scaled, color="tab:blue", linewidth=1.5, label="Latent State")

        if ntrial == 0:
            ax.legend()
            # Label for spike marks
            ax.plot([200, 200], [0, 0.8], color="gray", linewidth=1)
            ax.text(210, 0.2, "Neuron spike", color="gray", fontsize=6)

        if ntrial == 1:
            ax.plot([1000, 2000], [-30, -30], color="green")
            ax.text(1300, -50, "1sec")
        if ntrial < 2:
            ax.plot(ts, (xs * 40) + data.ydim, "k", color="black")

        # Set y-axis label only on leftmost column (even-numbered trials)
        if ntrial % 2 == 0:
            ax.set_ylabel("Neurons")

        # Set x-axis label only on bottom row (last two subplots)
        if ntrial >= 18:
            ax.set_xlabel("Time (ms)")

        # ax.set_yticks([])
        # ax.set_xticks([])

## Task 4. Visualization of covariance matrix.

Plot (a) the covariance matrix of the observed data as well as its approximation using (b) one and (c) five latent variable(s). Use the analytical solution for the covariance matrix of the approximation*. Note that the solution is essentially the mean and covariance of the [log-normal distribution](https://en.wikipedia.org/wiki/Log-normal_distribution).

$ \mu = \exp(\frac{1}{2} \text{ diag}(CC^T)+d)$

$ \text{Cov}= \mu\otimes\mu^T \odot \exp(CC^T)+\mu\cdot \mathbb{I} - \mu\otimes\mu^T$ 

*[Krumin, M., and Shoham, S. (2009). Generation of Spike Trains with Controlled Auto- and Cross-Correlation Functions. Neural Computation 21, 1642–1664](http://www.mitpressjournals.org/doi/10.1162/neco.2009.08-08-847).

*Grading: 3 pts*

In [None]:
# --------------------------------------------------------------
# Complete the analytical solution for the covariance matrix of
# the approximation using the provide equations (2 pts)
# --------------------------------------------------------------


def cov(fit: engine.PPGPFAfit) -> np.ndarray:

    C = fit.optimParams["C"]
    d = fit.optimParams["d"]

    CCt = C @ C.T
    diag_CCt = np.sum(CCt, axis=1)  # diag(CC^T)
    mu = np.exp(0.5 * diag_CCt + d)

    outer_mu = np.outer(mu, mu)
    c = outer_mu * np.exp(CCt) + np.diag(mu) - outer_mu

    return c, mu


# --------------------------------------------------------------
# Plot the covariance matrix of
# (1) the observed data
# (2) its approximation using 1 latent variable
# (3) its approximation using 5 latent variable
# and explain how they compare (1+1 pts).
# --------------------------------------------------------------

obs_corr = np.cov(data.all_raster)
opt_r1, mu1 = cov(xval.fits[0])
opt_r5, mu5 = cov(xval.fits[4])

# HINT: Think about which type of colormap and ranges are appropriate here.

fig, axs = plt.subplots(1, 3, figsize=(10, 3.5))
# add plot to visualize the differences in the covariance matrices

from matplotlib.colors import LogNorm

# option1 : take log to scale down plots
# => clip values beforehand to avoid zero or negative values for LogNorm
epsilon = 1e-6
obs_corr_safe = np.clip(obs_corr, epsilon, None)
opt_r1_safe = np.clip(opt_r1, epsilon, None)
opt_r5_safe = np.clip(opt_r5, epsilon, None)

# observed covariance
im0 = axs[0].imshow(obs_corr_safe, cmap="viridis", norm=LogNorm())
axs[0].set_title("Observed Covariance")
axs[0].set_xlabel("Neuron")
axs[0].set_ylabel("Neuron")

# 1 latent variable
im1 = axs[1].imshow(opt_r1_safe, cmap="viridis", norm=LogNorm())
axs[1].set_title("Approximation (xdim=1)")
axs[1].set_xlabel("Neuron")

# 5 latent variables
im2 = axs[2].imshow(opt_r5_safe, cmap="viridis", norm=LogNorm())
axs[2].set_title("Approximation (xdim=5)")
axs[2].set_xlabel("Neuron")

fig.colorbar(im2, ax=axs, orientation="vertical", fraction=0.025, pad=0.04)

### Questions

What do you see / expect to see?

_YOUR ANSWER GOES HERE_

As expected, there are large values on the diagonal, showing how each neuron varies. Without a scaling with LogNorm, only diagonal values were visible. To get a better look at the overall structure, we scaled the values down, which resulted in plots with more detail, mostly for the off-diagonal values. As expected, 5 latent variables approximate the original data much better, especially in those off-diagonal values. 1 latent variable only models the firing rates as being influenced by 1 factor, so the matrix is only a low-rank approximation of the true covariance. Off-diagonal structure is partly visible, but details are missing. 5 latent variables are still visibly different, but much closer to the true covariance, with more detail in the off-diagonal structure.