# Setup

In [1]:
import pyphi
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt
import pandas as pd
import itertools
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import binarize

In [2]:
%matplotlib inline

In [3]:
import pixiedust

Pixiedust database opened successfully


## Immutables

In [110]:
RAW_DATA_DIR = "../data/raw/"
SAMPLE_DATA_FILE = "split2250_bipolarRerefType1_lineNoiseRemoved_postPuffpreStim.mat"

In [114]:
FLY_DATA = sio.loadmat(RAW_DATA_DIR + SAMPLE_DATA_FILE).get("fly_data")

# Processing Functions

In [4]:
def gen_log_reg(data):
    """ Generate logistic regression for binarised past and present states.
    
    Args:
        data (array): (timepoints, channels, trials) array of binarised data.
                      The data will pool each timepoint step as a separate trial.
                      
    Returns: 
        List of Logistic regressions (fitted) for each channel.
    """
    
    n_timepoints, n_channels, n_trials = data.shape
    
    samples = np.zeros((n_channels * 2, (n_trials * (n_timepoints - 1))))
    
    for i_p in range(n_timepoints - 1):
        sliced = data[i_p:i_p+2, :, :]
        new_sample = sliced.reshape((n_channels * 2, n_trials))
        samples[:, i_p * n_trials:(i_p+1)*n_trials] = new_sample
    
    X = samples[0:n_channels, :].transpose()
    
    models = []
    
    for i_c in range(n_channels):
        y = samples[n_channels + i_c]
        lr = LogisticRegression(solver='lbfgs')
        model = lr.fit(X, y)
        models.append(model)
    
    return models

In [5]:
def models_to_tpm(models, n_channels):
    """ Converts a logistic regression model to a TPM
    
    Args:
        models: List of fitted logistic regression models.
        n_channels (int): Number of channels used to generate model.
        
    Returns:
        A numpy array as a TPM.
    """
    
    tpm_shape = [2] * n_channels + [n_channels]
    
    tpm = np.zeros(tpm_shape)
    
    for state in itertools.product((0, 1), repeat=n_channels):
        for i_m, model in enumerate(models):
            tpm[state + (i_m,)] = model.predict_proba(np.array([state]))[0][1]
    
    return tpm

In [6]:
def tpm_log_reg(data):
    """ Generate tpm using log regression for binarised past and present states.
    
    Args:
        data (array): (timepoints, channels, trials) array of binarised data.
                      The data will pool each timepoint step as a separate trial.
                      
    Returns: 
        TPM for the input data.
    """
    
    _, n_channels, _ = data.shape
    
    return models_to_tpm(gen_log_reg(data), n_channels)

# Testing

## Generated Data

In [8]:
def det_generate(tpm, n_timepoints, n_channels):
    
    states = list(itertools.product((0, 1), repeat=n_channels))
    
    samples = np.zeros((1 + n_timepoints, n_channels, (2 ** n_channels) * 10))
    
    for i_s, state in enumerate(states * 10):
        samples[0, :, i_s] = np.array(state)
        
        for i_t in range(n_timepoints):
            curr_state = tuple(samples[i_t, :, i_s].astype(int))
            value = tpm[curr_state]
            samples[i_t + 1, :, i_s] = value
    
    return samples

In [22]:
test_shape = [2] * 3 + [3]
test_tpm = np.zeros(test_shape)
"""
test_tpm[(0, 0, 0)] = np.array([0, 0, 0])
test_tpm[(1, 0, 0)] = np.array([0, 0, 1])
test_tpm[(0, 1, 0)] = np.array([1, 0, 1])
test_tpm[(1, 1, 0)] = np.array([1, 0, 0])
test_tpm[(0, 0, 1)] = np.array([1, 1, 0])
test_tpm[(1, 0, 1)] = np.array([1, 1, 1])
test_tpm[(0, 1, 1)] = np.array([1, 1, 1])
test_tpm[(1, 1, 1)] = np.array([1, 1, 0])
"""
test_tpm[(0, 0, 0)] = np.array([0, 1, 0])
test_tpm[(1, 0, 0)] = np.array([1, 0, 0])
test_tpm[(0, 1, 0)] = np.array([1, 1, 1])
test_tpm[(1, 1, 0)] = np.array([1, 0, 1])
test_tpm[(0, 0, 1)] = np.array([1, 1, 0])
test_tpm[(1, 0, 1)] = np.array([1, 1, 1])
test_tpm[(0, 1, 1)] = np.array([0, 1, 0])
test_tpm[(1, 1, 1)] = np.array([1, 0, 0])

In [31]:
test_samples = det_generate(test_tpm, 100, 3)

In [32]:
out_tpm = tpm_log_reg(test_samples).round(decimals=0)

In [33]:
for state in itertools.product((0, 1), repeat=3):
    print("STATE = {}, IN_TPM = {}, OUT_TPM = {}".format(state, 
                                                         test_tpm[state], 
                                                         out_tpm[state]))

STATE = (0, 0, 0), IN_TPM = [0. 1. 0.], OUT_TPM = [1. 1. 0.]
STATE = (0, 0, 1), IN_TPM = [1. 1. 0.], OUT_TPM = [1. 1. 0.]
STATE = (0, 1, 0), IN_TPM = [1. 1. 1.], OUT_TPM = [1. 1. 1.]
STATE = (0, 1, 1), IN_TPM = [0. 1. 0.], OUT_TPM = [1. 1. 1.]
STATE = (1, 0, 0), IN_TPM = [1. 0. 0.], OUT_TPM = [1. 0. 0.]
STATE = (1, 0, 1), IN_TPM = [1. 1. 1.], OUT_TPM = [1. 0. 0.]
STATE = (1, 1, 0), IN_TPM = [1. 0. 1.], OUT_TPM = [1. 0. 0.]
STATE = (1, 1, 1), IN_TPM = [1. 0. 0.], OUT_TPM = [1. 0. 0.]


## Fly Data

In [116]:
FLY_DATA.shape

(2250, 15, 8, 13, 2)