In [1]:
import numpy as np
import os

from utils import set_params
from utils import load_pickle, extract_used_data
from utils import mergeAB, align_track, z_score_on
from utils import plot3d_lines_to_html
from utils import pca_fit
from utils import plot_neuron_id

from utils.config import Params
from typing import Dict, List

In [2]:

def pca_on(data: dict, params: Params, ana_tt: list[str], ana_bt: list[str], key:str = "z_scored_firing"):
    
    blocks = []
    for index in params.ana_index_grid(ana_tt, ana_bt):
        if data[key][index] is not None:
            blocks.append(data[key][index])

    if len(blocks) == 0:
        raise ValueError('No data to pca')

    # fr_sum: [num_neurons, (len_track * num_types)]
    fr_sum = np.hstack(blocks)
    
    # fr_sum = fr_sum[selection_array, :]
    
    pcs, _ = pca_fit(fr_sum.T, params.pca_n_components)
    
    x_vec = pcs[:, 0]
    y_vec = pcs[:, 1]
    z_vec = pcs[:, 2]
    
    colors = [
        [0.20, 0.20, 0.20],
        [0.99, 0.22, 0.24],
        [0.99, 0.55, 0.16],
        [0.80, 0.12, 0.46],
        [0.20, 0.45, 0.85],
        [0.00, 0.62, 0.45],
        [0.53, 0.36, 0.78],
    ]

    xlines, ylines, zlines = [], [], []
    lineLabels, lineColors = [], []
    
    for index in params.ana_index_grid(ana_tt, ana_bt):
        
        fr = data[key][index]
        if fr is None:
            continue
        
        # fr = fr[selection_array, :]
    
        xline = x_vec @ fr
        yline = y_vec @ fr
        zline = z_vec @ fr
        
        xlines.append(xline)
        ylines.append(yline)
        zlines.append(zline)
        
        row, col = index
        
        lineLabels.append(f"{params.tt[row]} {params.bt[col]}")
        lineColors.append(colors[row])
        
        vec = get_reward_vec(data, params, trial_type=params.tt[row], behavior_type=params.bt[col])
        
        # vec = vec[selection_array, :]
        
        xlines.append(x_vec @ vec)
        ylines.append(y_vec @ vec)
        zlines.append(z_vec @ vec)
        
        lineLabels.append(f"{params.tt[row]} {params.bt[col]} Reward")
        lineColors.append(colors[row])

    return xlines, ylines, zlines, lineLabels, lineColors, pcs
        

def get_reward_vec(data: Dict, params: Params, trial_type: str, behavior_type: str) -> np.ndarray:
    index = params.ana_index_grid([trial_type], [behavior_type])
    if len(index) != 1:
        raise ValueError("Invalid trial_type and behavior_type")
    index = index[0]
    firing = data["simple_firing"][index]
    reward = data["pos_reward_type"][index]
    if firing is None:
        raise ValueError("Invalid trial_type and behavior_type")
    trial_num = firing.shape[1]
    trial_vec_list = []
    for t in range(trial_num):
        reward_list = reward[t]
        if len(reward_list) == 0:
            continue
        reward_pos = next((x for x in reward_list if x > 0.1), None)
        reward_idx = int(reward_pos / params.space_unit)
        trial_vec_list.append(firing[:, t, reward_idx - 5 : reward_idx + 5])
    trial_vec_list = np.array(trial_vec_list)
    trial_vec_list = np.mean(trial_vec_list, axis=0)
    mu = data["z_score_value"][index][0]
    sg = data["z_score_value"][index][1]
    trial_vec_list = (trial_vec_list - mu) /sg
    return trial_vec_list

In [3]:
def cosine_sim(a, b, eps=1e-12):
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    denom = (np.linalg.norm(a) * np.linalg.norm(b))
    return float(np.dot(a, b) / max(denom, eps))

def pearson_corr(a, b, eps=1e-12):
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    a = a - a.mean()
    b = b - b.mean()
    denom = np.linalg.norm(a) * np.linalg.norm(b)
    return float(np.dot(a, b) / max(denom, eps))

def neuron_selection(data: Dict, params: Params, ana_tt: List[str], ana_bt: List[str]):
    
    num_neurom = 0
    for index in params.ana_index_grid(ana_tt, ana_bt):
        if data["aligned_firing"][index] is not None:
            num_neurom = data["aligned_firing"][index].shape[0]
    if num_neurom == 0:
        raise ValueError("No data to select")
    selection_array = np.array([False] * num_neurom)

    zone_idx = data["aligned_zones_id"]
    for id in range(num_neurom):
        is_selected = False
        for index in params.ana_index_grid(ana_tt, ana_bt):
            if data["aligned_firing"][index] is None:
                continue
            
            trial_type = params.tt[index[0]]
            if "position" in trial_type:
                reward_pos = 3
            elif "pattern" in trial_type:
                if "1" in trial_type or "CAB" in trial_type or "CBA" in trial_type:
                    reward_pos = 1
                elif "2" in trial_type or "BCA" in trial_type or "ACB" in trial_type:
                    reward_pos = 3
                elif "3" in trial_type or "BAC" in trial_type or "ABC" in trial_type:
                    reward_pos = 5
                else:
                    raise ValueError("Invalid trial_type")
            else:
                raise ValueError("Invalid trial_type")

            reward_pattern = [1 if i == (reward_pos - 1) else 0 for i in range(5)]
            post_reward_pattern = [1 if i >= (reward_pos - 1) else 0 for i in range(5)]
            
            firing = data["aligned_firing"][index][id,:]

            mu = np.mean(firing)
            sg = np.maximum(np.std(firing), np.finfo(float).eps)
            firing = (firing - mu) / sg
            
            firing_pattern = [0 for i in range(5)]
            for j in range(5):
                start = zone_idx[j][0]
                end = zone_idx[j][1]
                firing_pattern[j] = np.max(firing[start:end])
            
            sim_reward = pearson_corr(firing_pattern, reward_pattern)
            sim_post_reward = pearson_corr(firing_pattern, post_reward_pattern)
            score = np.maximum(np.abs(sim_reward), np.abs(sim_post_reward))
            
            if score < 0.4:
                is_selected = True
                
        selection_array[id] = is_selected
    
    return selection_array

In [4]:
# Params set

data_path = "../../data/"
results_path = "../../results/"
sub_directory = 'flexible_shift'

params = set_params(tt_preset='merge',
                         bt_preset='basic',
                         data_path=data_path,
                         results_path=results_path,
                         sub_directory=sub_directory,
                         len_pos_average=10,)


In [5]:

# data_dict = load_pickle(os.path.join(params.data_path, params.sub_directory))

# for key, value in data_dict.items():
    
#     data = data_dict[key]
#     data = extract_used_data(data)
#     mergeAB(data)
    
#     align_track(data, params)
#     z_score_on(data, params, ana_tt=['*'], ana_bt=['*'])
#     # z_score_on(data, params, ana_tt=['origin'], ana_bt=['*'])
#     # z_score_on(data, params, ana_tt=['pattern_*'], ana_bt=['*'])
#     # z_score_on(data, params, ana_tt=['position_*'], ana_bt=['*'])
#     x_lines, y_lines, z_lines, line_labels, line_colors, _ = pca_on(data, params, ana_tt=['*'], ana_bt=['correct'])

#     # plot
#     # out_path = os.path.join(params.results_path, params.sub_directory, "pca_align_track", f"{key}_pca.html")
#     plot3d_lines_to_html(x_lines, y_lines, z_lines, line_labels = line_labels, line_colors = line_colors)
    

In [6]:
data = load_pickle("../../data/flexible_shift/RDH01-PFCsep2.pkl")

data = extract_used_data(data)
mergeAB(data)

align_track(data, params)
# selection_array = neuron_selection(data, params, ana_tt=['*'], ana_bt=['correct'])
z_score_on(data, params, ana_tt=['origin'], ana_bt=['*'])
z_score_on(data, params, ana_tt=['pattern_*'], ana_bt=['*'])
z_score_on(data, params, ana_tt=['position_*'], ana_bt=['*'])
x_lines, y_lines, z_lines, line_labels, line_colors, pcs = pca_on(data, params, ana_tt=['*'], ana_bt=['correct'])

plot3d_lines_to_html(x_lines, y_lines, z_lines, line_labels = line_labels, line_colors = line_colors)