In [10]:
import os
import numpy as np
from sklearn import svm
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler


def log_svm(Input_matrix, labels, k_folds=5, lda_flag=False, normalize=False):
    """
    Translates the given MATLAB SVM function to Python using scikit-learn.
    Performs random K-fold cross-validation without stratification.

    Parameters:
    - Input_matrix: Features (X) of the dataset
    - labels: True labels (y) of the dataset
    - k_folds: Number of folds for cross-validation (default: 5)
    - lda_flag: Flag to apply LDA or not (default: False)
    - normalize: Flag to normalize input data (default: False)

    Returns:
    - hit_rate: Overall accuracy (hit rate)
    - confusion_matrix_normalized: Normalized confusion matrix
    - errors_per_fold: List of misclassifications per fold
    """

    input_matrix = np.array(Input_matrix, dtype=float)
    labels = np.array(labels, dtype=float)

    # Normalize data if required
    if normalize:
        scaler = StandardScaler()
        input_matrix = scaler.fit_transform(input_matrix)

    total_trials = input_matrix.shape[0]
    predicted_labels = np.zeros(labels.shape)
    cumulative_error = 0
    errors_per_fold = []

    # Using KFold for random cross-validation (not stratified)
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    for train_index, test_index in kfold.split(input_matrix):
        train_data, test_data = input_matrix[train_index], input_matrix[test_index]
        train_labels, test_labels = labels[train_index], labels[test_index]

        # Apply LDA if lda_flag is set (scikit-learn's LDA)
        if lda_flag:
            from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

            lda = LinearDiscriminantAnalysis()
            train_data = lda.fit_transform(train_data, train_labels)
            test_data = lda.transform(test_data)

        # Train SVM model (linear kernel: kernel='linear', RBF kernel: kernel='rbf')
        classifier = svm.SVC(
            kernel="linear"
        )  # For linear kernel, set kernel='rbf' for RBF kernel
        classifier.fit(train_data, train_labels)
        predicted_test_labels = classifier.predict(test_data)

        # Count misclassifications
        fold_error = np.sum(test_labels != predicted_test_labels)
        errors_per_fold.append(fold_error)
        cumulative_error += fold_error
        predicted_labels[test_index] = predicted_test_labels

    # Compute performance of the decoder
    hit_rate = 1 - (cumulative_error / total_trials)

    # Compute confusion matrix and normalize it
    confusion_matrix_result = confusion_matrix(labels, predicted_labels)
    confusion_matrix_normalized = (
        confusion_matrix_result.astype(float)
        / confusion_matrix_result.sum(axis=1)[:, np.newaxis]
    )

    return hit_rate, confusion_matrix_normalized, errors_per_fold


# Example usage:
# input_matrix = ...  # Your data
# labels = ...        # Your labels
# hit_rate, conf_matrix, errors = log_svm(input_matrix, labels, k_folds=5, lda_flag=True, normalize=True)

In [24]:
import sys

# filepath = 'your_file.mat'  # Replace with your actual file path

with open(filepath, 'rb') as f:
    header = f.read(128)  # The MATLAB file header is 128 bytes
    try:
        header_text = header.decode('utf-8', errors='ignore')
    except UnicodeDecodeError:
        header_text = header.decode('latin1', errors='ignore')
    print(header_text)

MATLAB 5.0 MAT-file, Platform: MACA64, Created on: Mon Oct 28 14:24:41 2024                                              IM


In [28]:
import os
import numpy as np
import pandas as pd
import scipy.io as sio
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler


# Function to load data
def load_data(file_path):
    mat_data = sio.loadmat(file_path)
    return (
        mat_data["Spike_rasters"],
        mat_data["labels"],
        mat_data["behav_categ"],
        mat_data["block_times"],
        mat_data["monkey"],
        mat_data["unit_count"],
        mat_data["groom_labels_all"],
        mat_data["brain_label"],
        mat_data["behavior_log"],
        mat_data["behav_categ_original"],
    )


# Set parameters
is_mac = True
home = os.path.expanduser("~") if is_mac else "C:/Users/GENERAL"
source_directory = os.path.join(home, "Dropbox (Penn)/data/monkey")
a_directory = os.path.join(source_directory, "Data_MonkeyA")
h_directory = os.path.join(source_directory, "Data_MonkeyH")

# Get all session directories
a_sessions = [
    os.path.join(a_directory, f)
    for f in os.listdir(a_directory)
    if not f.startswith(".")
]
h_sessions = [
    os.path.join(h_directory, f)
    for f in os.listdir(h_directory)
    if not f.startswith(".")
]
all_sessions = a_sessions + h_sessions  # Combine both session lists
# sessions to skip
sessions_to_skip = ["Hooke_2021-09-09", "Amos_2021-09-14"]  # missing necessary files, or other
all_sessions = [session for session in all_sessions if not any(skip in session for skip in sessions_to_skip)]


mean_hitrate = []
sd_hitrate = []
mean_hitrate_shuffled = []

for session_path in all_sessions:
    # Check if the session path is valid and contains the necessary files
    if not os.path.exists(session_path):
        print(f"Session path does not exist: {session_path}")
        continue

    for channel_flag in ["vlPFC", "TEO", "all"]:
        filepath = os.path.join(session_path, f'processed_for_SVM_{channel_flag}.mat')
        
        try:
            mat_data = sio.loadmat(filepath)
            
            # Assign the output to appropriate Pythonic variables
            spike_rasters, labels, labels_partner, behav_categ, block_times, monkey, unit_count, groom_labels_all, brain_label, behavior_log, behav_categ_original = (
                mat_data["Spike_rasters"],
                mat_data["labels"],
                mat_data["labels_partner"],
                mat_data["behav_categ"],
                mat_data["block_times"],
                mat_data["monkey"],
                mat_data["unit_count"],
                mat_data["groom_labels_all"],
                mat_data["brain_label"],
                mat_data["behavior_log"],
                mat_data["behav_categ_original"]
            )
            
            # Process data
            spike_count_raster = spike_rasters.T
            behavior_labels = np.array([label[2] for label in labels])  # Extract unique behavior info
            co_occurrence = np.array([label[4] for label in labels])
            
            # Select epochs where only one behavior happens
            idx_single = np.where(co_occurrence < 4)[0]
            spike_count_raster = spike_count_raster[idx_single, :]
            behavior_labels = behavior_labels[idx_single]
            
            # Compute frequency of behavior for the session
            behav_freq_table = pd.Series(behavior_labels).value_counts()
            behav_freq_table = behav_freq_table[behav_freq_table >= 30]  # Minimum occurrences
            
            # Select behaviors with a minimum number of occurrences
            behav = behav_freq_table.index
            
            # Only keep the behaviors of interest
            idx = np.isin(behavior_labels, behav)
            spike_count_raster_final = spike_count_raster[idx, :]
            behavior_labels_final = behavior_labels[idx]
            
            # Run SVM using the log_svm function
            hit_rate, conf_matrix, errors = log_svm(
                spike_count_raster_final,
                behavior_labels_final,
                k_folds=5,
                lda_flag=False,
                normalize=True,
            )
            
            mean_hitrate.append(hit_rate)
            sd_hitrate.append(np.std(errors))  # Assuming errors is a list of misclassifications per fold
            mean_hitrate_shuffled.append(np.mean(errors))  # Adjust as needed
            
        except KeyError as e:
            print(f"KeyError: {e}. Continuing without this data.")
            continue

# Plotting results
plt.figure()
plt.bar(
    ["Real", "Shuffled"],
    [np.mean(mean_hitrate), np.mean(mean_hitrate_shuffled)],
    alpha=0.5,
)
plt.ylabel("Decoding Accuracy")
plt.title("Decoding accuracy for subject current behavioral states")
# plt.savefig("SVM_results_subjectBehav.pdf")
plt.show()

ValueError: Data must be 1-dimensional, got ndarray of shape (8812, 1, 1) instead

In [30]:
import os
import scipy.io as sio
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import os
import numpy as np
import pandas as pd
import scipy.io as sio
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler
# from utils import load_data, preprocess_data
# from macaques_example import log_svm  # Assuming log_svm is defined in macaques-example.ipynb and exported



def load_data(file_path):
    """
    Load data from a .mat file.

    Parameters:
    - file_path: Path to the .mat file.

    Returns:
    - Tuple containing various data arrays.
    """
    mat_data = sio.loadmat(file_path)
    return (
        mat_data["Spike_rasters"],
        mat_data["labels"],
        mat_data.get("labels_partner", None),
        mat_data["behav_categ"],
        mat_data["block_times"],
        mat_data["monkey"],
        mat_data["unit_count"],
        mat_data["groom_labels_all"],
        mat_data["brain_label"],
        mat_data["behavior_log"],
        mat_data["behav_categ_original"],
    )


def preprocess_data(spike_rasters, labels, behav_categ, co_occurrence, min_occurrences=30):
    """
    Preprocess spike and behavior data.

    Parameters:
    - spike_rasters: Spike raster data.
    - labels: Behavior labels.
    - behav_categ: Behavior categories.
    - co_occurrence: Co-occurrence data.
    - min_occurrences: Minimum number of occurrences to consider a behavior.

    Returns:
    - Filtered spike rasters and behavior labels.
    """
    spike_count_raster = spike_rasters.T
    behavior_labels = np.array([label[2] for label in labels]).flatten()
    co_occurrence = np.array([label[4] for label in labels]).flatten()

    # Select epochs where only one behavior happens
    idx_single = np.where(co_occurrence < 4)[0]
    spike_count_raster = spike_count_raster[idx_single, :]
    behavior_labels = behavior_labels[idx_single]

    # Compute frequency of behavior for the session
    behav_freq_table = pd.Series(behavior_labels).value_counts()
    behav_freq_table = behav_freq_table[behav_freq_table >= min_occurrences]

    behav = behav_freq_table.index

    # Only keep the behaviors of interest
    idx = np.isin(behavior_labels, behav)
    spike_count_raster_final = spike_count_raster[idx, :]
    behavior_labels_final = behavior_labels[idx]

    return spike_count_raster_final, behavior_labels_final, behav


def get_all_sessions(source_directory, sessions_to_skip):
    """
    Retrieve all session directories excluding those to skip.
    """
    a_directory = os.path.join(source_directory, "Data_MonkeyA")
    h_directory = os.path.join(source_directory, "Data_MonkeyH")

    a_sessions = [
        os.path.join(a_directory, f)
        for f in os.listdir(a_directory)
        if not f.startswith(".")
    ]
    h_sessions = [
        os.path.join(h_directory, f)
        for f in os.listdir(h_directory)
        if not f.startswith(".")
    ]
    all_sessions = a_sessions + h_sessions
    all_sessions = [
        session for session in all_sessions
        if not any(skip in session for skip in sessions_to_skip)
    ]
    return all_sessions


home = os.path.expanduser("~") if is_mac else "C:/Users/GENERAL"
source_directory = os.path.join(home, "Dropbox (Penn)", "data", "monkey")
sessions_to_skip = ["Hooke_2021-09-09", "Amos_2021-09-14"]

all_sessions = get_all_sessions(source_directory, sessions_to_skip)

mean_hitrate = []
sd_hitrate = []
mean_hitrate_shuffled = []

for session_path in all_sessions:
    if not os.path.exists(session_path):
        print(f"Session path does not exist: {session_path}")
        continue

    for channel_flag in ["vlPFC", "TEO", "all"]:
        filepath = os.path.join(session_path, f'processed_for_SVM_{channel_flag}.mat')

        try:
            # Load data
            (
                spike_rasters,
                labels,
                labels_partner,
                behav_categ,
                block_times,
                monkey,
                unit_count,
                groom_labels_all,
                brain_label,
                behavior_log,
                behav_categ_original
            ) = load_data(filepath)

            # Preprocess data
            spike_raster_final, labels_final, behav = preprocess_data(
                spike_rasters,
                labels,
                behav_categ,
                co_occurrence=None  # Modify if co_occurrence is available
            )

            # Run SVM
            hit_rate, conf_matrix, errors = log_svm(
                spike_raster_final,
                labels_final,
                k_folds=5,
                lda_flag=False,
                normalize=True
            )

            mean_hitrate.append(hit_rate)
            sd_hitrate.append(np.std(errors))
            mean_hitrate_shuffled.append(np.mean(errors))

        except KeyError as e:
            print(f"KeyError: {e}. Continuing without this data.")
            continue

# Plotting results
plt.figure()
plt.bar(
    ["Real", "Shuffled"],
    [np.mean(mean_hitrate), np.mean(mean_hitrate_shuffled)],
    alpha=0.5,
    color=['blue', 'orange']
)
plt.ylabel("Decoding Accuracy")
plt.title("Decoding accuracy for subject current behavioral states")
plt.show()

# Save results
results_dir = os.path.join(home, "Documents", "projects", "Datalogger", "Results", "SVM_results")
os.makedirs(results_dir, exist_ok=True)
results_path = os.path.join(results_dir, "SVM_results_subjectBehav.npy")
np.save(results_path, {
    "mean_hitrate": mean_hitrate,
    "sd_hitrate": sd_hitrate,
    "mean_hitrate_shuffled": mean_hitrate_shuffled,
    "behav": behav
})


In [34]:
results_dir = os.path.join(home, "Documents", "projects", "Datalogger", "Results", "SVM_results")
results_path = os.path.join(results_dir, "SVM_results_subjectBehav.npy")
results = np.load(results_path, allow_pickle=True).item()


In [45]:
np.vstack(results['mean_hitrate']).reshape(12, -1)

array([[0.91310345, 0.8637931 , 0.95172414],
       [0.88685085, 0.85498108, 0.92158661],
       [0.85844419, 0.83111625, 0.90124349],
       [0.88248272, 0.86071481, 0.92498897],
       [0.84613385, 0.80740741, 0.89291748],
       [0.88724751, 0.87684655, 0.90774797],
       [0.90222541, 0.91213209, 0.93955492],
       [0.90513502, 0.90303624, 0.93759619],
       [0.87870696, 0.85874912, 0.91384399],
       [0.90771106, 0.90084201, 0.94615555],
       [0.92986075, 0.91903043, 0.95822589],
       [0.91298084, 0.91586227, 0.95389713]])