In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import scipy
import numpy as np
from fitters import *
from tools import *
from plots import *
from results import Results
set_seaborn()

In [None]:

def preprocess(signal, factor=1, method='mean'):
    # take mean of all trials
    mean = np.mean(signal, axis=0)
    # down samples mean of trials
    n = len(mean) // factor
    if method =='nth':
        down = mean[::factor]
    if method == 'fourier':
        down = scipy.signal.resample(mean, n)
    if method == 'mean':
        down = []
        for i in range(0,len(mean),factor):
            m = np.mean(mean[i:i+factor]) 
            down.append(m)
        down = np.array(down)
    return down

def build_train_test_hankels(X, Y, dim, test_start, train_test_ratio):

    start = int(len(X) * test_start)
    end = start + int(len(X) * train_test_ratio)

    X_test, Y_test = X[start:end],  Y[start:end]
    X_train, Y_train = X.copy(), Y.copy()

    X_test = normalize(X_test)
    X_train = normalize(X_train)
    Y_test = normalize(Y_test)
    Y_train = normalize(Y_train)

    # use to remove from hankel later
    X_train[start:end] = np.nan
    
    # remove the beginning for causal convoltuion
    Y_train = np.delete(Y_train, np.arange(0,dim))
    Y_test = np.delete(Y_test, np.arange(0,dim))

    test_hankel = build_hankel(X_test, dim)
    train_hankel = build_hankel(X_train, dim)

    # remove lag vector with nan
    nan_cols = np.bitwise_or.reduce(np.isnan(train_hankel),0)
    # print(np.count_nonzero(nan_cols))

    train_hankel = np.delete(train_hankel, nan_cols, axis=1)
    Y_train = np.delete(Y_train, nan_cols)

    return train_hankel, Y_train, test_hankel, Y_test

def train_test_method(X, Y, model, dim, train_test_ratio=0.2, noise=0.0, betas=np.arange(0,10,1)):

    test_start_range = np.linspace(0.0, 1.0-train_test_ratio, 100)
    T = len(test_start_range)
    B = len(betas)

    X += noise * np.random.normal(size=len(X), scale=noise)

    train_errors = np.zeros((B,T))
    test_errors = np.zeros((B,T))
    filters = np.zeros((B,T,dim))
    all_params = np.zeros((B,T), dtype='O')
    for t, test_start in enumerate(test_start_range):
        
        train_hankel, Y_train, test_hankel, Y_test = build_train_test_hankels(X, Y, dim, test_start, train_test_ratio)
        for b, beta in enumerate(betas):
            
            P_train, theta, params = model.train(train_hankel, Y_train, dim, beta=beta)
            P_test = model.test(test_hankel, theta, params)

            # if model.name == 'Eigen 50' and beta ==1.0:
            #     plt.plot(Y_test)
            #     plt.plot(P_test)
            #     plt.show()

            train_err = mean_square_error(Y_train, P_train)
            test_err = mean_square_error(Y_test, P_test)

            train_errors[b,t] = train_err
            test_errors[b,t] = test_err
            filters[b,t,:] = theta
            all_params[b,t] = params

    return train_errors, test_errors, filters, all_params, betas


In [None]:
resPH, stimPH = read_lmc('PHOTO')
res3, stim3 = read_lmc('LMC-BG3')
res4, stim4 = read_lmc('LMC-BG4')
res5, stim5 = read_lmc('LMC-BG5')
res6, stim6 = read_lmc('LMC-BG6')

TIME = 1.0
all_results = []

In [None]:
data = 'LMC-BG4'
d_method = 'mean'
factor = 10
dim = 50
betas = np.concatenate([np.linspace(0,0.5,40),np.arange(1,10,2)])
X = preprocess(resPH, factor=factor, method=d_method)
Y = preprocess(res4, factor=factor, method=d_method) *-1
time_window = round( dim*(TIME/len(X))*1000, 3)
dT = 1/len(X)*1000
time_str = f'dT={dT}ms, W={time_window}ms'
title = f'[{time_str}]'


model = ConstantModel(f'Constant d=2 {title}')
res = train_test_method(X, Y, model, 2, betas=betas)
C2_Results = Results(model.name, *res)

model = LinearModel(f'Linear {title}')
res = train_test_method(X, Y, model, dim, betas=betas)
Lin50_Results = Results(model.name, *res)

model = EigenModel(f'Eigen {title}')
res = train_test_method(X, Y, model, dim, betas=betas)
Eig50_Results = Results(model.name, *res)

cur_results = [C2_Results, Lin50_Results, Eig50_Results]
all_results.extend(cur_results)

# plotting
plot_beta_errs(cur_results, title=title)
plot_filter(*Lin50_Results.get_opt_filter(opt_set='train'), title=f'{title} Opt over TRAIN Linear')
plot_filter(*Eig50_Results.get_opt_filter(opt_set='train'), title=f'{title} Opt over TRAIN Eig')
plot_filter(*Lin50_Results.get_opt_filter(opt_set='test'), title=f'{title} Opt over TEST Linear')
plot_filter(*Eig50_Results.get_opt_filter(opt_set='test'), title=f'{title} Opt over TEST Eig')
plot_spectrum(Eig50_Results.get_avg_spectrum(), title=f'{title} beta={round(Eig50_Results.get_opt_beta(),3)}')
plot_self_corr(X, dim, title=title)

In [None]:
data = 'LMC-BG4'
d_method = 'mean'
factor = 20
dim = 25
betas = np.concatenate([np.linspace(0,0.5,40),np.arange(1,10,2)])
X = preprocess(resPH, factor=factor, method=d_method)
Y = preprocess(res4, factor=factor, method=d_method) *-1
time_window = round( dim*(TIME/len(X))*1000, 3)
dT = 1/len(X)*1000
time_str = f'dT={dT}ms, W={time_window}ms'
title = f'[{time_str}]'


model = ConstantModel(f'Constant d=2 {title}')
res = train_test_method(X, Y, model, 2, betas=betas)
C2_Results = Results(model.name, *res)

model = LinearModel(f'Linear {title}')
res = train_test_method(X, Y, model, dim, betas=betas)
Lin50_Results = Results(model.name, *res)

model = EigenModel(f'Eigen {title}')
res = train_test_method(X, Y, model, dim, betas=betas)
Eig50_Results = Results(model.name, *res)

cur_results = [C2_Results, Lin50_Results, Eig50_Results]
all_results.extend(cur_results)

# plotting
plot_beta_errs(cur_results, title=title)
plot_filter(*Lin50_Results.get_opt_filter(opt_set='train'), title=f'{title} Opt over TRAIN Linear')
plot_filter(*Eig50_Results.get_opt_filter(opt_set='train'), title=f'{title} Opt over TRAIN Eig')
plot_filter(*Lin50_Results.get_opt_filter(opt_set='test'), title=f'{title} Opt over TEST Linear')
plot_filter(*Eig50_Results.get_opt_filter(opt_set='test'), title=f'{title} Opt over TEST Eig')
plot_spectrum(Eig50_Results.get_avg_spectrum(), title=f'{title} beta={round(Eig50_Results.get_opt_beta(),3)}')
plot_self_corr(X, dim, title=title)

In [None]:
data = 'LMC-BG4'
d_method = 'mean'
factor = 40
dim = 12
betas = np.concatenate([np.linspace(0,0.5,40),np.arange(1,10,2)])
X = preprocess(resPH, factor=factor, method=d_method)
Y = preprocess(res4, factor=factor, method=d_method) *-1
time_window = round( dim*(TIME/len(X))*1000, 3)
dT = 1/len(X)*1000
time_str = f'dT={dT}ms, W={time_window}ms'
title = f'[{time_str}]'


model = ConstantModel(f'Constant d=2 {title}')
res = train_test_method(X, Y, model, 2, betas=betas)
C2_Results = Results(model.name, *res)

model = LinearModel(f'Linear {title}')
res = train_test_method(X, Y, model, dim, betas=betas)
Lin50_Results = Results(model.name, *res)

model = EigenModel(f'Eigen {title}')
res = train_test_method(X, Y, model, dim, betas=betas)
Eig50_Results = Results(model.name, *res)

cur_results = [C2_Results, Lin50_Results, Eig50_Results]
all_results.extend(cur_results)

# plotting
plot_beta_errs(cur_results, title=title)
plot_filter(*Lin50_Results.get_opt_filter(opt_set='train'), title=f'{title} Opt over TRAIN Linear')
plot_filter(*Eig50_Results.get_opt_filter(opt_set='train'), title=f'{title} Opt over TRAIN Eig')
plot_filter(*Lin50_Results.get_opt_filter(opt_set='test'), title=f'{title} Opt over TEST Linear')
plot_filter(*Eig50_Results.get_opt_filter(opt_set='test'), title=f'{title} Opt over TEST Eig')
plot_spectrum(Eig50_Results.get_avg_spectrum(), title=f'{title} beta={round(Eig50_Results.get_opt_beta(),3)}')
plot_self_corr(X, dim, title=title)

In [None]:
plot_beta_errs(all_results, plot_opt=False, plot_errs=False, title='Coarse Graining Results')