# System Identification Models

This notebook describes how to build system identification models for any target distribution

In [None]:
!pip install git+https://github.com/sinzlab/nnsysident.git@ICLR2023

In [1]:
import torch
import numpy as np
import pandas as pd
from torch import nn

import matplotlib.pyplot as plt

from nnsysident.datasets.mouse_loaders import static_loaders
from nnsysident.utility.data_helpers import extract_data_key
from nnsysident.utility.measures import get_model_performance
from nnsysident.utility.data_helpers import get_dims_for_loader_dict

random_seed = 27121992
device = 'cuda'

dataport not available, will only be able to load data locally


___

## Data

In [3]:
paths = ['./data/static20457-5-9-preproc0']

data_key = extract_data_key(paths[0])

dataset_config = {'paths': paths,
                  'batch_size': 64,
                  'seed': random_seed,
                  'loader_outputs': ["images", "responses"],
                  'normalize': True,
                  'exclude': ["images"],
                  "cuda": True if device=="cuda" else False
                  }

dataloaders = static_loaders(**dataset_config)

session_shape_dict = get_dims_for_loader_dict(dataloaders["train"])
n_neurons_dict = {k: v["responses"][1] for k, v in session_shape_dict.items()}
in_shapes_dict = {k: v["images"] for k, v in session_shape_dict.items()}

## Model

### Core

In [6]:
from neuralpredictors.layers.cores import Stacked2dCore
from neuralpredictors.utils import get_module_output

core = Stacked2dCore(input_channels=1,
                     hidden_channels=64,
                     input_kern=9,
                     hidden_kern=7)

in_shape_dict = {k: get_module_output(core, in_shape)[1:] for k, in_shape in in_shapes_dict.items()}

### Readout

In [7]:
from neuralpredictors.layers.readouts import (
    GeneralizedFullGaussianReadout2d,
    GeneralizedPointPooled2d,
    MultiReadoutBase,
    PointPooled2d,
)

In [9]:
readout = MultiReadoutBase(in_shape_dict=in_shape_dict,
                           n_neurons_dict=n_neurons_dict,
                           base_readout=GeneralizedFullGaussianReadout2d,
                           bias=True,
                           inferred_params_n=2)

### Encoder

The encoder needs to be implemented by the user, inheriting from the GeneralizedEncoderBase. All the is necessary is to: 
- provide a list of nonlinearities which makes sure that the parameters predicted by model fulfill the constraints of the respective parameters (for example: the variance of a Gaussian has to be positive -> nonlinearity = Elu1)
- a config list of the respective nonlinearities
- the method "predict_mean"

In [10]:
from neuralpredictors.layers.encoders.base import GeneralizedEncoderBase
from neuralpredictors.layers.activations import Elu1

In [12]:
class GaussianEncoder(GeneralizedEncoderBase):
    def __init__(
        self,
        core,
        readout,
        eps=1.e-10):
        
        nonlinearity_type_list = [nn.Identity(), Elu1()]
        nonlinearity_config_list = [{}, {"inplace": False, "eps": eps}]
    
        super().__init__(core, readout, nonlinearity_type_list, nonlinearity_config_list=nonlinearity_config_list)
    
    def predict_mean(self, x, *args, data_key=None, **kwargs):
        mean, variance = self.forward(x, *args, data_key=data_key, **kwargs)
        return mean

    

In [14]:
model = GaussianEncoder(core, readout)
model.to(device);

### Loss

In [22]:
from neuralpredictors.measures.modules import GaussianLoss
loss_fn = GaussianLoss()



In [23]:
for images, responses in dataloaders["train"][data_key]:
    break

output = model(images)

In [24]:
loss_fn(target=responses, output=output)

tensor(1.4528, device='cuda:0', grad_fn=<MeanBackward0>)

___

# Evaluation

This part is about how to evaluate your model performance using NInGa. For more info, see [this repo.](https://github.com/sinzlab/lurz_bashiri_iclr2023)

In [None]:
!pip install git+https://github.com/sinzlab/lurz_bashiri_iclr2023.git

In [25]:
import torch
import numpy as np

from neuralmetrics.datasets import simulate_neuron_data, simulate_neuron_data_advanced
from neuralmetrics.models.utils import get_zig_params_from_moments, get_zil_params_from_moments
from neuralmetrics.utils import bits_per_image
from neuralmetrics.models.gs_models import Gaussian_GS, Gamma_GS
from neuralmetrics.models.gs_zero_inflation import Zero_Inflation_Base
from neuralmetrics.models.priors import get_prior_for_gaussian, get_prior_for_q, get_prior_for_gamma, train_prior_for_gaussian
from neuralmetrics.models.flows.transforms import Log, Identity
from neuralmetrics.models.score_functions import compute_gs_loss_over_target_repeats, compute_null_loss
from neuralpredictors.measures.zero_inflated_losses import ZIGLoss, ZILLoss

from scipy.stats import beta as beta_distribution

from neuralpredictors.measures import corr
from neuralpredictors.measures.zero_inflated_losses import ZILLoss


random_seed = 27121992
device = 'cuda'

## Data

In [26]:
np.random.seed(random_seed)

exp_data = True
n_images = 360
n_repeats = 10
n_neurons = 100

mean = .5
variance = .01
A = (mean * (1 - mean) / variance - 1)
alpha = A * mean
beta = A * (1 - mean)
zero_inflation_level = beta_distribution(21, 117).rvs(n_neurons)
loc = np.exp(-10)

resps, gt_means, gt_variances, zil_params = simulate_neuron_data_advanced(n_images=n_images,
                                                      n_repeats=n_repeats,
                                                      n_neurons=n_neurons,
                                                      zero_inflation_level=zero_inflation_level,
                                                      loc=loc,
                                                      random_state=random_seed)

# If single trials are missing due to experimental errors, replace them by np.nan, for example:
resps[0, 0, :] = np.nan
n_trials = (n_repeats*n_images*n_neurons - np.isnan(resps).sum())

100% 100/100 [00:00<00:00, 1309.90it/s]
100% 360/360 [00:00<00:00, 10830.38it/s]


In [27]:
resps.shape

(10, 360, 100)

## Initialize GS model

In [29]:
loc = np.exp(-10)
slab_mask = np.ones_like(resps)
slab_mask[resps <= loc] = np.nan

#### Prior init for zero-inflation

In [30]:
print("Getting good init values for q prior parameters...")
q_prior_params = get_prior_for_q(torch.from_numpy(resps), loc)

Getting good init values for q prior parameters...


#### Prior init for Slab distribution

In [31]:
transform = Identity()
resps_transformed, _ = transform(torch.from_numpy(resps) - loc)
print("Getting good init values for slab prior parameters...")
slab_prior_params = get_prior_for_gamma(resps_transformed.numpy(),
                                               per_neuron=False,
                                               mask=slab_mask)
dist_slab = Gamma_GS(*slab_prior_params, train_prior_hyperparams=True)


possible_number_of_loo_repeats = np.unique([dist_slab.get_number_of_repeats(torch.from_numpy(resps[:, i, :])) - 1 for i in range(resps.shape[1])])
gs_model = Zero_Inflation_Base(
    loc,
    dist_slab,
    *q_prior_params,
    possible_number_of_loo_repeats=possible_number_of_loo_repeats,
    transform=transform,
).to(device)
gs_model.integrals_over_q_dict = gs_model.get_integrals_over_q()

Getting good init values for slab prior parameters...


  warn(


## Optimize prior params

Optimization takes very long. Interrupted here...

In [32]:
print("Optimizing prior parameters...")
gs_model, loss = train_prior_for_gaussian(resps, gs_model, max_iter=200, logger=False, use_map=False)

# Optionally save optimized prior params
prior_params = {k: v for k, v in gs_model.named_parameters()}
# torch.save(prior_params, "optimized_prior_params" + ".tar")

Optimizing prior parameters...
Loss: -1028400.5000, Epochs: 1

KeyboardInterrupt: 

## Obtain upper and lower bounds

In [33]:
params_from_moments_function = get_zig_params_from_moments
loss_function = ZIGLoss(per_neuron=True)

# Get upper bound log-likelihood per repeat, image and neuron
loss_gs = compute_gs_loss_over_target_repeats(resps, gs_model, False).item()
upper_bound = -loss_gs / n_trials

# Get lower bound log-likelihood per repeat, image and neuron
loss_null = compute_null_loss(resps, params_from_moments_function, loss_function, torch.Tensor([loc]).to(device), device).sum()
lower_bound = -loss_null / n_trials

In [34]:
print(f"upper_bound: {upper_bound}")
print(f"lower_bound: {lower_bound}")

upper_bound: 2.8836994651292027
lower_bound: 2.640894334274279
