In [1]:
%%writefile soma_suite2p_utility.py

import pandas as pd
import numpy as np
import os
from soma_detector_utility import *
    #this is where all the detector functions will be used; at least initially


"""
SUITE2P_STRUCTURE describes the sequence of directories to traverse to arrive at the files named in the key.
For example: "F": ["suite2p", "plane0", "F.npy"] --> File describing F is found at ./suite2p/plane0/F.npy.
All locations are relative to the suite2p output folder, which is output into the folder / file location where the original 
.tiff image for analysis was located. suite2p output can also be identified by checking if it follows this structure.
Both spks and iscell are not used because: spks are not calculated; no need for deconvolution
iscell is not used because all ROIs are considered and filtered later based on individual stats
"""
SUITE2P_STRUCTURE = {
    "F": ["suite2p", "plane0", "F.npy"],
    "Fneu": ["suite2p", "plane0", "Fneu.npy"],
    #'spks': ["suite2p", "plane0", "spks.npy"],
    "stat": ["suite2p", "plane0", "stat.npy"],
    "iscell": ["suite2p", "plane0", "iscell.npy"],
}
""" spks is not really necessary with our current set up since the spont. events are all pretty uniform, and are below 
    AP threshold (and therefore will not need to be deconvolved into action potentials themselves)"""

def load_npy_array(npy_path):
    return np.load(npy_path, allow_pickle=True) #functionally equivalent to np.load(npy_array) but iterable; w/ Pickle


def load_npy_df(npy_path):
    return pd.DataFrame(np.load(npy_path, allow_pickle=True)) #load suite2p outputs as pandas dataframe


def load_npy_dict(npy_path):
    return np.load(npy_path, allow_pickle=True)[()] #load .npy as dictionary

"""
The following 3 func. are used to translate_suite2p_outputs_to_csv;
check_for_suite2p_output is defined below: primarily for if iscell is not included (it always is)

Then, we append the folder location of suite2p outputs into the current path (found_output_paths = files in os.walk(path))
found_output_paths.append(current_path)
"""

def check_for_suite2p_output(path, check_for_iscell=False):
    for file, path_to_file in SUITE2P_STRUCTURE.items():
        if file == "iscell" and not check_for_iscell:
            continue
        if not os.path.isfile(os.path.join(path, *path_to_file)):
            return False
    return True
    #Strictly to check for iscell.npy; I originally forgot to include this when Marti Ritter wrote the code
    # I am also too scared to change this before my Master's less this break the pipeline
    # For the synapses it also is not relatively important since we assume all detected ROIs are real* synapses


def get_all_suite2p_outputs_in_path(path, check_for_iscell=False):
    found_output_paths = []
    
    for current_path, directories, files in os.walk(path):
        if check_for_suite2p_output(current_path, check_for_iscell=check_for_iscell):
            found_output_paths.append(current_path)
    return found_output_paths




def load_suite2p_output(path, use_iscell=False):
    """here we define our suite2p dictionary from the SUITE2P_STRUCTURE...see above"""
    suite2p_dict = {
        "F": load_npy_array(os.path.join(path, *SUITE2P_STRUCTURE["F"])),
        "Fneu": load_npy_array(os.path.join(path, *SUITE2P_STRUCTURE["Fneu"])),
        "stat": load_npy_df(os.path.join(path, *SUITE2P_STRUCTURE["stat"]))[0].apply(pd.Series),
    }

    if use_iscell == False:
        suite2p_dict["IsUsed"] = [(suite2p_dict["stat"]["skew"] >= 0) & 
                                                (suite2p_dict['stat']['footprint'] < 3.0) &
                                                (suite2p_dict['stat']['footprint'] > 0.0)]

        suite2p_dict["IsUsed"] = pd.DataFrame(suite2p_dict["IsUsed"]).iloc[:,0:].values.T
        suite2p_dict["IsUsed"] = np.squeeze(suite2p_dict["IsUsed"])
    else:
        suite2p_dict["IsUsed"] = load_npy_df(os.path.join(path, *SUITE2P_STRUCTURE["iscell"]))[0].astype(bool)

    return suite2p_dict
"""
Possible to append this function further for synapse exclusion
 for example, append the document based on 
suite2p_dict["stat"] using values for ["skew"]/["npix"]/["compactness"]
"""


def translate_suite2p_dict_to_df(suite2p_dict):
    """this is the principle function in which we will create our .csv file structure; and where we will actually use
        our detector functions for spike detection and amplitude extraction"""
    spike_amplitudes = [single_synapse_baseline_correction_and_peak_return(f_trace, fneu_trace, return_peaks = False) 
                       for (f_trace, fneu_trace) in zip(suite2p_dict["F"], suite2p_dict["Fneu"])]
    
    spikes_per_neuron = [single_synapse_baseline_correction_and_peak_return(f_trace, fneu_trace) 
                             for (f_trace, fneu_trace) in zip(suite2p_dict["F"], suite2p_dict["Fneu"])]
#spikes_per_neuron from single_cell_peak_return OUTPUT = list of np.arrays        
    df = pd.DataFrame({"IsUsed": suite2p_dict["IsUsed"],
                       "Skew": suite2p_dict["stat"]["skew"],
                       "PeakTimes": spikes_per_neuron,
                       "Amplitudes": spike_amplitudes,
                       "Total Frames": len(suite2p_dict["F"].T)})
                       
    df.index.set_names("SynapseID", inplace=True)
    return df

def translate_suite2p_outputs_to_csv(input_path, output_path, overwrite=False, check_for_iscell=False):
    """This will create .csv files for each video loaded from out data fram function below.
        The structure will consist of columns that list: "Amplitudes": spike_amplitudes})
        
        col1: ROI #, col2: IsUsed (from iscell.npy); boolean, col3: Skew (from stats.npy); could be replaced with any 
        stat >> compactness, col3: spike frames (relative to input frames), col4: amplitude of each spike detected measured 
        from the baseline (the median of each trace)"""
    
    suite2p_outputs = get_all_suite2p_outputs_in_path(input_path)
    
    for suite2p_output in suite2p_outputs:
        output_directory = os.path.basename(suite2p_output)
        translated_path = os.path.join(output_path, f"{output_directory}.csv")
        if os.path.exists(translated_path) and not overwrite:
            print(f"CSV file {translated_path} already exists!")
            continue

        suite2p_dict = load_suite2p_output(suite2p_output)
        suite2p_df = translate_suite2p_dict_to_df(suite2p_dict)

        suite2p_df.to_csv(translated_path)

        
#For establishing ImageJ/CellProfiler baseline
"""
def load_imageJ_xl_sheet(imageJ_xl_output_path, xl_sheetname, return_array=True):
    imageJ_xl = pd.read_excel(imageJ_xl_output_path, engine='openpyxl',
                       sheet_name = xl_sheetname, header = None)
    processed_imageJ_xl = imageJ_xl.T
        #transposed to match the output of suite2p / make more legible for humans
    processed_imageJ_xl_header = processed_imageJ_xl.iloc[0]
    processed_imageJ_xl = processed_imageJ_xl[1:]
    processed_imageJ_xl.columns = processed_imageJ_xl_header
    processed_imageJ_xl_numpy_array = pd.DataFrame.to_numpy(processed_imageJ_xl)
    if return_array == True:
        return processed_imageJ_xl_numpy_array
    else:
        return processed_imageJ_xl
"""

Writing soma_suite2p_utility.py
