In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import os
from tqdm.auto import tqdm

import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('qt5agg')


def load_data(input_path, dtype='uint16', nb_channels=256, channel_id=125, probe_size=None, voltage_resolution=0.1042, disable=False):
    """
    Function to load raw binary file for a given channel signal

    Input :
        - input_path (str) : path to binary file
        - dtype (str) : type of data size
        - nb_channels (int) : total number of channels on the mea
        - channel_id (int) : channel number to be read
        - probe_size (int) : if not None, read only part of the recording (used to check a channel did record some signal)
        - voltage_resolution (float) : voltage step per binary value recorded
        - disable (bool) : True to disable tqdm loading bar
        
    Output :
        - data (1D numpy array) : raw signal of the read channel
        - nb_samples (int) : number of time points in the recording
        
    Possible mistakes :
        - File doesn't exists, check input_path and folders
        - Type error due to very long recording exceeding dtype capcities
    """
    
    # Load data.
    m = np.memmap(os.path.normpath(input_path),dtype=dtype)
    
    #Input file sanity check
    if m.size % nb_channels != 0:
        message = "number of channels is inconsistent with the data size."
        raise Exception(message)
    
    
    nb_samples = m.size // nb_channels
    
    if probe_size:
        nb_samples = min(probe_size,nb_samples)
    
    data = np.empty((nb_samples,), dtype=dtype)
    for k in tqdm(range(nb_samples),disable = disable):
        data[k] = m[nb_channels * k + channel_id]
    data = data.astype(float)
    data = data + np.iinfo('int16').min 
    data = data / voltage_resolution
    
    return data, nb_samples


def detect_onsets(data, threshold):
    """
    Function to compute time point in the data coresponding to the display of a new frame of the stimuli based on trigger recording

    Input :
        - data (1D numpy array) : raw triggers data
        - threshold (int) : voltage value that detects onsets in data
        
    Output :
        - indices (1D numpy array) : list of time indices corresponding to the detected onsets time point
        
    Possible mistakes :
        - Threshold is no longer optimum and has to be changed
        - Wrong mea given as parameters
        - Data coming from the wrong channel
    """
    test_1 = data[:-1] < threshold
    test_2 = data[1:] >= threshold
    test = np.logical_and(test_1, test_2)
    
    indices = np.where(test)[0]
    
    test = data[indices - 1] < data[indices]
    while np.any(test):
        indices[test] = indices[test] - 1
        test = data[indices - 1] < data[indices]
    
    return indices

def run_minimal_sanity_check(triggers, sampling_rate=20000, maximal_jitter=0.25e-3, stim_type = 'visual'):
    """
        Compare the duration of each frame (ie distance between triggers) to see if a max error is reached

    Input :
        - triggers (list) : list of time point of triggers
        - sampling_rate (int) : number of time points per sec
        - maximal_jitter (int) : maximal error admissible in sec
        - stim_type (str) : stimulus type (ie 'visual' or 'holo') used to avoid doing sanity checks on holo triggers
    Output :
        - (1D numpy array) : Array of triggers time points that violate the maximum error
        
    Possible mistakes :
        - Stim_type given is wrong and holo stim type is given, check the calling of the function
        - indices threshold is wrong and some triggers are missed
        - triggers are corrupted
        - maximal_jitter is too restrictive
    """
    if stim_type == 'holo' :
        print('No sanity checks done on holographic stimulus')
        return np.array([]).astype('int64')
    elif len(triggers)<2:
        print('No sanity check performed, only 1 trigger detected. Is threshold correct ?')
        return np.array([]).astype('int64')
        
    # Check trigger statistics.
    inter_triggers = np.diff(triggers)
    inter_trigger_values, inter_trigger_counts = np.unique(inter_triggers, return_counts=True)

    index = np.argmax(inter_trigger_counts)
    inter_trigger_value = inter_trigger_values[index]
    errors = np.where(np.abs(inter_triggers - inter_trigger_value) >= maximal_jitter * sampling_rate)[0]
    
    if errors.size>0:
        print("Minimal sanity checks :\t/! Triggers are not evenly spaced /! \nNumber of errors : {}\nMaximum error : {} sampling points compared to {} sampling points per trigger".format(len(errors), max(np.abs(inter_trigger_values)), inter_trigger_value))
    else :
        print("Minimal sanity checks : Ok on all {} triggers".format(len(triggers)))

    return triggers[errors].astype('int64')

# Triggers Sanity Check

This notebook helps you check your recording triggers on the fly. After the end of the recording, extract the raw file from the mcd keeping only the trigger channel on MC_DataTool (visual = 127 & holo = 128). Channel number is +1 on MC_DataTool compared to the pipeline code due to python idexing starting at 0). You should have a ".raw" file containing one channel and give the bellow cell its path in the "input_file" variable. 

However, if you want to check recording trigger on your ".raw" containing all triggers, you can use this code by changing "Nchannels" and channel_id in the "Variable" section below.

#### If you want to read data from a filtered raw file (Artefacts removed files), use dtype ='int16' !!

In [15]:
"""
    Inputs
"""
#"path/to/file/recording_name.raw"
input_file    =  "/home/guiglaz/Bureau/Pipeline Git Repo/RAW_Files/02_DG_30ND50%_2sT_50Hz.raw" 


"""
    Variable
"""
#Determines the optimal threshold to detect onsets (modify bellow in "Setup part if needed")
MEA = 3

#256 for standard MEA, 1 if you extracted only trigger channel into the raw file
Nchannels  = 1                

# 126 for visual stim & 127 for holo stim (when the 256 electrodes have been extracted), PUT TO 0 FOR MONO CHANNEL RAW FILE
channel_id = 127  

"""
    Setup
"""

#the optimal threshhold for detecting stimuli onsets varies with the rig
if MEA==1: threshold  = 2.7e+5         
if MEA==2: threshold  = 1.5e+5          
if MEA==3: threshold  = 1.7e+5          
if MEA==4: threshold  = -3.14470e+5

# number of triggers samples acquired per second
fs         = 20000
    
"""
    Processing
"""
#Processing of data calling utils functions
print("Loading Data...")
data, t_tot    = load_data(input_file, channel_id = channel_id, dtype='uint16' )  
indices        = detect_onsets(data,threshold)
err = run_minimal_sanity_check(indices)


Loading Data...


  0%|          | 0/8308000 [00:00<?, ?it/s]

No sanity check performed, only 1 trigger detected. Is threshold correct ?


In [16]:
"""
    Ploting
"""

recording_name = input_file.replace('.raw','').split(r'/')[-1]
print(f"Ploting {recording_name} on channel {channel_id}")

plt.close('all')
plt.figure('{}'.format(recording_name), figsize=(10,30))
# Top plot with raw trigger signal, threshold of detection, detected triggers and wrong triggers
plt.subplot(3,2,(1,4))
plt.title('Triggers')

plt.plot(np.linspace(0, len(data)/20000, len(data)),data)

plt.plot(indices/20000,data[indices],'.',markersize=2,zorder=10)

plt.axhline(threshold, color='green')
plt.scatter(err/20000,data[err], color='red', marker='x',zorder = 15)

# Bottom left plot of triggers indices. Should be a perfect diagonal
plt.subplot(3,2,5)
plt.plot(indices)
plt.title('Detected indices')

# Bottom right plot of relative error gap between detect time of frame and mean frame time
plt.subplot(3,2,6)
plt.plot(np.diff(np.diff(indices)))
plt.title('Duration {} +- error'.format(np.round(np.mean(np.diff(indices)))))
plt.show(block = False)

print('\n\t\t\t------ End Of Cell ------')


Ploting 02_DG_30ND50%_2sT_50Hz on channel 127

			------ End Of Cell ------


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [12]:
np.linspace?

In [85]:
data_vis = data

In [122]:
data_holo = data

In [131]:
data=data_holo[:20000*10]
plt.plot(data)
plt.show(block = False)


In [127]:
data_holo[:20000*10].max()>0

True

In [82]:
probe = 10000

In [86]:
a_vis = data_vis[:probe].sum()

In [87]:
a_vis

1290547.0249520154