<a href="https://colab.research.google.com/github/manuelnapolesromero/-Levantar-un-Nodo-Hadoop-con-Docker/blob/main/Optimizaci%C3%B3n_del_procesamiento_de_datos_neurofisiol%C3%B3gicos_para_el_momo_RR34.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
"""
Written by Sebastian Pujalte for the Rossi Pool Lab 01/2022.
Updated and improved for robustness and clarity.
Script for processing all .nev data and associated .mat psychophysical data for the RR34 monkey.

This script processes raw neurophysiological data (.nev files) and corresponding psychophysical
data (.mat files) into a standardized, clean format suitable for downstream analysis,
such as spike sorting with TopoSort.

TODO:
- Review and generalize the mat_processer for other monkeys if needed.
- Consider adding more detailed error handling for file parsing issues (e.g., corrupted files).
- Evaluate performance with very large datasets and potentially optimize I/O operations.
"""

from toposort.pipeline import spike_pipeline, trailing_number
import logging
import numpy as np
from brpylib import NevFile, brpylib_ver
from scipy.io import loadmat
import os
from os.path import join as pjoin
from pandas import DataFrame
from logging import info, error, warning

# --- Version Control ---
BR_PYLIB_VER_REQ = "1.3.1"
# It's good practice to ensure brpylib is imported successfully before checking its version.
# The original code already ensures this by being at the top level.
if brpylib_ver.split(".") < BR_PYLIB_VER_REQ.split("."):
    raise ImportError(
        f"Requires brpylib {BR_PYLIB_VER_REQ} or higher. Please update your brpylib package."
    )

def nev_processer(filepath: str, savepath: str = None) -> dict:
    """ Processes all .nev files within a given directory into a cleaner format.
        Data is separated into individual electrodes, and spike trial/time
        information is saved into .csv files. Waveforms are returned for
        downstream processing (e.g., alignment).

    Parameters
    ----------
    filepath : str
        Path to the directory where the .nev files are stored.
    savepath : str, optional
        Path to the directory where you wish to save processed .nev data (.csv files).
        If None, data will be returned by the function but not saved to disk.

    Returns
    -------
    data : dict
        A dictionary where keys are electrode identifiers (e.g., 'e1', 'e2').
        Each electrode's entry contains 'STime', 'Waveforms', and 'STrial' lists/arrays.
    """
    if not os.path.isdir(filepath):
        error(f"Provided filepath is not a valid directory: {filepath}. Returning empty data.")
        return {}

    # Filter for .nev files and sort them by trailing number (trial number)
    all_nev_files = [
        f for f in os.listdir(filepath) if f.endswith(".nev") and f.startswith("d")
    ]

    # Handle the case where no files are found more gracefully for sorting
    if not all_nev_files:
        info(f"No .nev files found in {filepath} matching 'd*.nev' pattern. Returning empty data.")
        return {}

    nev_files_with_numbers = []
    for f in all_nev_files:
        num = trailing_number(f[:-4])
        if num is not None:
            nev_files_with_numbers.append((num, f))
        else:
            # Using warning here is appropriate if it's not a fatal error for the whole process
            warning(f"Could not extract trial number from {f}. Skipping file for processing.")

    # If all files were skipped due to missing trial numbers
    if not nev_files_with_numbers:
        info(f"All .nev files in {filepath} were skipped due to missing trial numbers. Returning empty data.")
        return {}

    nev_files_with_numbers.sort()
    sorted_nev_files = [f for _, f in nev_files_with_numbers]

    # min_trial calculation should only happen if nev_files_with_numbers is not empty
    min_trial = nev_files_with_numbers[0][0]

    data = {} # Initialize dictionary to hold processed data

    for f in sorted_nev_files:
        full_path = pjoin(filepath, f)
        # We already ensured num is not None when adding to nev_files_with_numbers
        trial = int(trailing_number(f[:-4]))

        try:
            with NevFile(full_path) as file:
                # Request a reasonable range of channels (0-128 is common for many systems)
                # This avoids magic numbers like 33 and is more general.
                # Consider making this range configurable if needed for different setups.
                nev_data = file.getdata(elec_ids=list(range(129)))

                # Check if 'spike_events' key exists and contains data
                if "spike_events" not in nev_data or not nev_data["spike_events"]:
                    info(f"No spike events found in {f} or 'spike_events' is empty. Skipping file.")
                    continue

                electrodes = nev_data["spike_events"]["ChannelID"]
                waveforms = nev_data["spike_events"]["Waveforms"]
                spike_times = nev_data["spike_events"]["TimeStamps"]

        except Exception as e:
            # Catching a more specific exception like brpylib.NevFileError might be better
            # if brpylib offers it, but a general Exception is fine for robustness here.
            error(f"Error processing .nev file {f}: {e}. Skipping to next file.")
            continue

        for i, elect_num in enumerate(electrodes):
            # It's safer to check if waveforms[i] is iterable and not empty.
            # Some libraries might return None or an empty array/list.
            if not isinstance(waveforms[i], (list, np.ndarray)) or not len(waveforms[i]):
                warning(f"Electrode {elect_num} in {f} has no waveforms or invalid data. Skipping.")
                continue

            n_spikes = len(waveforms[i])

            # Initialize electrode entry if it doesn't exist
            # Using dict.setdefault is a slightly more concise way to do this.
            data.setdefault(f"e{elect_num}", {"STime": [], "Waveforms": [], "STrial": []})

            # Use extend for lists for efficiency
            data[f"e{elect_num}"]["STime"].extend(spike_times[i])
            data[f"e{elect_num}"]["Waveforms"].extend(waveforms[i])
            data[f"e{elect_num}"]["STrial"].extend(
                np.full(n_spikes, trial, dtype=int).tolist() # Convert back to list if using extend
            )

    # --- Post-processing and Filtering ---
    # Convert lists to NumPy arrays and filter electrodes with few spikes
    electrodes_to_delete = []
    for e_key in list(data.keys()): # Iterate over a copy of keys for safe deletion
        total_n_spikes = len(data[e_key]["STrial"])
        if total_n_spikes > 100:
            # Convert lists to NumPy arrays after all data has been aggregated
            # Ensure dtypes are appropriate for storage/downstream use.
            # Using .astype() if source might be float or mixed, otherwise just np.array(..., dtype=...)
            data[e_key]["STime"] = np.array(data[e_key]["STime"], dtype=np.int64)
            data[e_key]["Waveforms"] = np.array(data[e_key]["Waveforms"], dtype=np.int16)
            data[e_key]["STrial"] = np.array(data[e_key]["STrial"], dtype=np.int32)
        else:
            electrodes_to_delete.append(e_key)
            info(
                f"Electrode {e_key} contains only {total_n_spikes} spike(s). "
                "This electrode will be ignored (assumed error or insufficient data)."
            )

    for e_key in electrodes_to_delete:
        del data[e_key]

    # Regularize trial numbers to start from 1
    # This block only executes if min_trial was successfully found and is > 1.
    if min_trial > 1:
        info(f"Normalizing trial numbers. Original minimum trial: {min_trial}. New minimum trial: 1.")
        for e_key in data.keys():
            data[e_key]["STrial"] -= (min_trial - 1)

    # --- Optional Saving to CSV ---
    if savepath is not None:
        try:
            os.makedirs(savepath, exist_ok=True)
            info(f"Ensured save directory exists: {savepath}")
        except OSError as e: # Catch specific OS errors for directory creation
            error(f"Failed to create save directory {savepath}: {e}. Skipping CSV saving.")
            return data # Exit early if savepath can't be created

        for e_key in data.keys():
            electrode_save_path = pjoin(savepath, e_key)
            try:
                os.makedirs(electrode_save_path, exist_ok=True)
            except OSError as e:
                error(f"Failed to create electrode directory {electrode_save_path}: {e}. Skipping CSV saving for this electrode.")
                continue # Skip to next electrode

            # Save STime and STrial as CSV. Waveforms are handled downstream.
            try:
                np.savetxt(
                    fname=pjoin(electrode_save_path, "STime.csv"),
                    X=data[e_key]["STime"],
                    fmt="%i",
                    delimiter=","
                )
                np.savetxt(
                    fname=pjoin(electrode_save_path, "STrial.csv"),
                    X=data[e_key]["STrial"],
                    fmt="%i",
                    delimiter=","
                )
                info(f"Saved STime.csv and STrial.csv for {e_key} in {electrode_save_path}")
            except Exception as e_save:
                error(f"Error saving CSVs for {e_key} at {electrode_save_path}: {e_save}")

    return data


def mat_processer(filepath: str, savepath: str = None, monkey_prefix: str = "RR034") -> DataFrame:
    """ Processes .mat files with psychophysical information about the experiment
        into a friendlier .csv format. This function is currently tailored
        for specific monkey data formats but can be adapted.

    Parameters
    ----------
    filepath : str
        Path to the directory where the .mat file is stored. Note that this function
        assumes there is typically one relevant .mat file per monkey/experiment.
    savepath : str, optional
        Path to directory where you wish to save processed psychophysical information.
        If None, then the DataFrame will be returned but not saved to file.
    monkey_prefix : str, optional
        The file prefix used to identify the relevant .mat file (e.g., "RR034").
        Defaults to "RR034".

    Returns
    -------
    psico_df : DataFrame
        Returns a Pandas DataFrame where each row is a trial and columns
        correspond to psychophysical information. Returns an empty DataFrame
        if processing fails.
    """
    COLS = [
        "Trial", "Class", "SelectedButton", "CorrectButton", "Hits",
        "AmpStim1", "FreqStim1", "AudStim1", "AmpStim2", "FreqStim2", "AudStim2",
        "AmpStim3", "FreqStim3", "AudStim3", "RelKD", "RelKU", "RelMov", "Luz",
        "TRW", "Set", "pd", "kd", "o1", "f1", "o2", "f2", "o3", "f3", "pu", "ku", "pb",
    ]

    if not os.path.isdir(filepath):
        error(f"Provided filepath is not a valid directory for .mat processing: {filepath}. Returning empty DataFrame.")
        return DataFrame(columns=COLS)

    # Filter for .mat files with the specified monkey prefix
    mat_files = [
        f for f in os.listdir(filepath) if f.endswith(".mat") and f.startswith(monkey_prefix)
    ]

    if not mat_files:
        error(f"No .mat file found in {filepath} with prefix '{monkey_prefix}'. Returning empty DataFrame.")
        return DataFrame(columns=COLS)

    if len(mat_files) > 1:
        warning(f"Multiple .mat files found with prefix '{monkey_prefix}' in {filepath}. "
                f"Using the first one: {mat_files[0]}.")

    mat_file_to_load = pjoin(filepath, mat_files[0])

    data_mat = None # Initialize to None in case of loading error
    try:
        data_mat = loadmat(mat_file_to_load)["genera"]
    except KeyError:
        error(f"Key 'genera' not found in .mat file: {mat_file_to_load}. Check file structure. Returning empty DataFrame.")
        return DataFrame(columns=COLS)
    except Exception as e:
        error(f"Error loading .mat file {mat_file_to_load}: {e}. Returning empty DataFrame.")
        return DataFrame(columns=COLS)

    if data_mat is None: # Should not happen with current error handling, but good for safety
        return DataFrame(columns=COLS)

    # Determine start index, handling potential empty header row
    # Added more robust check for data_mat shape before accessing indices
    if data_mat.shape[0] == 0 or data_mat.shape[1] < 1 or len(data_mat[0, 0]) == 0:
         warning(f"Unexpected .mat 'genera' structure or empty header in {mat_file_to_load}. Assuming idx = 0.")
         idx = 0 # Default to 0 if structure is odd, or if data_mat[0,0] doesn't exist
    else:
        idx = 1 if (isinstance(data_mat[0, 0], np.ndarray) and data_mat[0, 0].size == 0) else 0

    # Extract relevant data parts and stack them. Use checks for shape/content.
    try:
        # Ensure raw data parts are not empty before squeezing/stacking
        if data_mat[idx:, 0].size == 0 or data_mat[idx:, 2].size == 0:
            info(f"No psychophysical data found in {mat_file_to_load} after header processing. Returning empty DataFrame.")
            return DataFrame(columns=COLS)

        psico1_raw_stacked = np.stack(data_mat[idx:, 0])
        psico2_raw_stacked = np.stack(data_mat[idx:, 2])

        psico1 = np.squeeze(psico1_raw_stacked)[:, :20]
        psico2 = np.squeeze(psico2_raw_stacked)

        # Ensure consistent dimensions for concatenation: convert scalar to 1D array if needed
        # This handles cases where squeeze might result in 0-D arrays.
        if psico1.ndim == 0: # If scalar, make it 1D
            psico1 = np.array([psico1])
        elif psico1.ndim == 1:
            psico1 = psico1.reshape(-1, 1) # Ensure it's a column vector for consistent concatenation

        if psico2.ndim == 0: # If scalar, make it 1D
            psico2 = np.array([psico2])
        elif psico2.ndim == 1:
            psico2 = psico2.reshape(-1, 1) # Ensure it's a column vector for consistent concatenation

        # Check if arrays are compatible for concatenation (same number of rows)
        if psico1.shape[0] != psico2.shape[0]:
            error(f"Dimension mismatch in psychophysical data from {mat_file_to_load}. Cannot concatenate. Returning empty DataFrame.")
            return DataFrame(columns=COLS)

        psico = np.concatenate((psico1, psico2), axis=1)

        # Ensure COLS has enough elements for the actual number of columns in 'psico'
        if psico.shape[1] > len(COLS):
            warning(f"Psychophysical data has more columns ({psico.shape[1]}) than defined in COLS ({len(COLS)}). Extra columns will be unnamed.")
            # Extend COLS with generic names or handle as needed
            extended_cols = COLS + [f"Unnamed_{i}" for i in range(psico.shape[1] - len(COLS))]
            psico_df = DataFrame(psico, columns=extended_cols)
        else:
            psico_df = DataFrame(psico, columns=COLS[:psico.shape[1]])

    except Exception as e:
        error(f"Error processing psychophysical data from {mat_file_to_load}: {e}. Returning empty DataFrame.")
        return DataFrame(columns=COLS)

    # Drop duplicate rows based on 'Trial' column
    initial_rows = len(psico_df)
    if "Trial" in psico_df.columns: # Check if 'Trial' column actually exists
        psico_df.drop_duplicates("Trial", inplace=True)
        if len(psico_df) < initial_rows:
            info(f"Removed {initial_rows - len(psico_df)} duplicate trial entries from psychometrics.")
    else:
        warning(f"Column 'Trial' not found in psychometric data. Cannot drop duplicates based on trial number.")


    if savepath is not None:
        try:
            os.makedirs(savepath, exist_ok=True)
            info(f"Ensured save directory exists: {savepath}")
            psico_df.to_csv(pjoin(savepath, "psychometrics.csv"), index=False)
            info(f"Saved psychometrics.csv to {savepath}")
        except OSError as e:
            error(f"Failed to create save directory {savepath} or save psychometrics.csv: {e}. Skipping CSV saving.")
        except Exception as e_save:
            error(f"Error saving psychometrics.csv to {savepath}: {e_save}")

    return psico_df


if __name__ == "__main__":
    # --- Configuration Parameters ---
    # Directory where .nev and .mat raw data files are located
    file_path = "/media/ferxxo/Expansion"

    # Directory where processed data will be saved. The script will create a new
    # folder named monkey_name_processed within this path.
    save_path = "/home/ferxxo/Desktop"

    # Name of the monkey, used for folder naming and log file.
    monkey_name = "RR34"

    # Prefix for identifying the .mat file relevant to this monkey (e.g., "RR034").
    # This makes the mat_processer slightly more flexible.
    monkey_mat_prefix = "RR034"

    # --- Logging Setup ---
    # Configure logging to capture and print any errors/info to a .txt file.
    log_file_path = pjoin(save_path, f"py_logging_{monkey_name}.txt")

    # Ensure the directory for the log file exists before configuring logging
    log_dir = os.path.dirname(log_file_path)
    if log_dir and not os.path.exists(log_dir):
        try:
            os.makedirs(log_dir, exist_ok=True)
        except OSError as e:
            # If log directory can't be created, log to console as fallback
            print(f"ERROR: Could not create log directory {log_dir}: {e}. Logging to console.")
            logging.basicConfig(
                format="%(asctime)s %(levelname)-8s %(message)s",
                level=logging.INFO,
                datefmt="%Y-%m-%d %H:%M:%S",
            )
        else:
            logging.basicConfig(
                filename=log_file_path,
                format="%(asctime)s %(levelname)-8s %(message)s",
                level=logging.INFO,
                datefmt="%Y-%m-%d %H:%M:%S",
            )
    else: # If log_dir is empty (current directory) or already exists
        logging.basicConfig(
            filename=log_file_path,
            format="%(asctime)s %(levelname)-8s %(message)s",
            level=logging.INFO,
            datefmt="%Y-%m-%d %H:%M:%S",
        )

    # logging.captureWarnings(True) # Uncomment if you want to capture Python warnings

    info(f"Starting data processing for monkey: {monkey_name}")
    info(f"Source data path: {file_path}")
    info(f"Processed data save path: {save_path}")

    # --- Execute the Data Processing Pipeline ---
    try:
        # The spike_pipeline function integrates mat_processer and nev_processer.
        # It handles the overall flow and likely merges the outputs for TopoSort.
        # Ensure `spike_pipeline` can accept `monkey_mat_prefix` as a keyword argument
        # or adapt its signature if necessary.
        spike_pipeline(
            file_path,
            save_path,
            monkey_name,
            mat_processer,
            nev_processer,
            monkey_mat_prefix=monkey_mat_prefix
        )
        info(f"Data processing for {monkey_name} completed successfully.")
    except Exception as pipeline_e:
        error(f"An unhandled error occurred during the spike_pipeline execution: {pipeline_e}", exc_info=True)
    finally:
        # Ensure all pending log messages are written before program exits
        logging.shutdown()

ModuleNotFoundError: No module named 'spike_pipeline'