In [1]:
import numpy as np
import time
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import LeaveOneOut
import os

from sklearn.metrics import confusion_matrix, roc_auc_score, f1_score
from sklearn.preprocessing import label_binarize



from scipy.stats import mannwhitneyu    
from xgboost import XGBClassifier
import random
from sklearn.model_selection import KFold


# caculating_10_important_features
import matplotlib.pyplot as plt
from scipy.stats import kurtosis
from scipy.stats import skew
# sig_data = DATA.reshape(DATA.shape[0], -1)
def train_model_using_kfold(data, label, model, n_splits=10):
    kf = KFold(n_splits=n_splits, shuffle=True)
    result = []

    # Loop over each train/test split
    for train_index, test_index in kf.split(data):
        # Split the data into training and testing sets
        X_train, X_test = data[train_index], data[test_index]
        y_train, y_test = label[train_index], label[test_index]

        # Train the classifier
        model.fit(X_train, y_train)

        # Predict the label for the test set
        y_pred = model.predict(X_test)

        # Append the accuracy to the list
        result.append([y_pred, y_test])

    return np.array(result)


def train_model_using_loocv(data, label, model):
    loo = LeaveOneOut()
    result = []

    # Loop over each train/test split
    for train_index, test_index in loo.split(data):
        # Split the data into training and testing sets
        X_train, X_test = data[train_index], data[test_index]
        y_train, y_test = label[train_index], label[test_index]

        # Train the classifier
        model.fit(X_train, y_train)

        # Predict the label for the test set
        y_pred = model.predict(X_test)

        # Append the accuracy to the list
        result.append([y_pred, y_test])

    return np.array(result), model

stats_method = 'mannwhitneyu' # 'ranksums' or 'mannwhitneyu'
def zero_diagnonal(arr):
    # Loop over the first and last dimension
    for i in range(arr.shape[0]):  # Loop over subjects
        for j in range(arr.shape[-1]):  # Loop over views
            np.fill_diagonal(arr[i, :, :, j], 0)
    return arr


def return_significant_FCvalue(adj, labels):
    adj = zero_diagnonal(adj)
    hc_adj = adj[np.where(labels==1)]
    md_adj = adj[np.where(labels==0)]
    count = 0

    num_view = adj.shape[-1]
    p_view = np.zeros((52,52,num_view))
    effect_size = np.zeros((52,52,num_view))
    stats = np.zeros((52,52,num_view))
    for view in range(num_view):
        for seed in range(52):
            for target in range(52):
                hc_val = hc_adj[:, seed, target, view]
                md_val = md_adj[:, seed, target, view]
                if stats_method == 'mannwhitneyu':
                    stat, p1 = mannwhitneyu(hc_val,md_val)

                else:
                    raise ValueError('stats_method should be mannwhitneyu or ranksums')
                p_view[seed, target, view] = p1
                stats[seed, target, view] = stat
    
    adj = adj.reshape(adj.shape[0], -1)
    p_view = p_view.reshape(-1)
    return adj[:, p_view<0.05]

def get_activity_start_time(data, index_start):
    gradient = np.gradient(data)
    max_gradient = np.argmax(gradient[0:int(index_start*1.2)])  # 0:index_start*4 # current index_start = 400,
    if max_gradient <= index_start:
        max_gradient = index_start
    return max_gradient


"""
Calulate the left slope based on the data sample time from activity_start_time and  activity_start_time + task_duration//2
"""


def get_left_slope(data, activity_start, task_duration):
    activity_start = int(activity_start)
    data = data[activity_start: activity_start+task_duration//2]
    slope, _ = np.polyfit(np.arange(data.shape[0]), data, 1)
    return slope


"""
Calulate the right slope based on the data sample time from activity_start_time + task_duration//2 and activity_start_time + task_duration  
"""


def get_right_slope(data, activity_start, task_duration):
    activity_start = int(activity_start)
    data = data[activity_start+task_duration//2: activity_start+task_duration]
    slope, _ = np.polyfit(np.arange(data.shape[0]), data, 1)
    return slope


"""
For calculating FWHM
"""


def find_nearest(array, value):
    idx = (np.abs(array - value)).argmin()
    return idx


def get_FWHM(y_values, activity_start, task_duration):
    # make sure the peak value is situated in the task duration period
    task = y_values[activity_start:activity_start+task_duration]
    max_task = np.max(task)  # Find the maximum y value
    half_max_y = max_task / 2.0
    max_index_task = np.argmax(task)
    # if max_index_task is in the first two values, set left_index to 0
    if max_index_task <= 1:
        left_index = 0
    else:
        left_index = find_nearest(y_values[:max_index_task], half_max_y)
    # if max_index_task is in the last two values, set right_index to the last value
    if max_index_task >= activity_start+task_duration-1:
        right_index = task_duration-1
    else:
        right_index = find_nearest(
            y_values[max_index_task:], half_max_y) + max_index_task

    return right_index - left_index


"""Get all 10 features"""


def get_10_features(hbo, index_start, task_duration):
    feature_shape = hbo.shape[:2]
    # Feature 1 mean
    mean = np.mean(hbo, axis=2)  # feature shape is (subject, channel)

    # Feature 2 variance
    variance = np.std(hbo, axis=2)  # feature shape is (subject, channel)

    # Feature 3 activity_start_time
    activity_start_time = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            activity_start_time[sub, ch] = get_activity_start_time(
                hbo[sub, ch], index_start=index_start)

    # # Feature 4 left_slope
    left_slope = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            left_slope[sub, ch] = get_left_slope(
                hbo[sub, ch], activity_start=activity_start_time[sub, ch], task_duration=task_duration)
    # # Feature 5  right_slope
    right_slope = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            right_slope[sub, ch] = get_right_slope(
                hbo[sub, ch], activity_start=activity_start_time[sub, ch], task_duration=task_duration)

    # # Feature 6 kurtosis
    kurt = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            kurt[sub, ch] = kurtosis(hbo[sub, ch])
    # There might be some nan in kurtosis calucaltion because of all 0-value array
    kurt = np.nan_to_num(kurt)

    # # Feature 7 skewness
    skewness = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            skewness[sub, ch] = skew(hbo[sub, ch])
    # There might be some nan in skewness calucaltion because of all 0-value array
    skewness = np.nan_to_num(skewness)

    # # Feature 8 area under the curve AUC Based on the sample time from activity_start_time + task_duration
    AUC = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            activity_start = int(activity_start_time[sub, ch])
            AUC[sub, ch] = np.sum(
                hbo[sub, ch][activity_start:activity_start+task_duration])
    # for sub in range(10):
    #     plt.plot(AUC[sub])

    # # Feature 9 full width half maximum (FWHM)
    # FWHM
    FWHM = np.empty_like(mean)
    for sub in range(feature_shape[0]):
        for ch in range(feature_shape[1]):
            activity_start = int(activity_start_time[sub, ch])
            FWHM[sub, ch] = get_FWHM(
                hbo[sub, ch], activity_start, task_duration)
    # for sub in range(10):
    #     plt.plot(FWHM[sub])

    # # Feature 10 peak
    peak = np.max(hbo, axis=2)

    features = np.concatenate((mean, variance, activity_start_time,
                              left_slope, right_slope, kurt, skewness, AUC, FWHM, peak), axis=1)

    return features

def get_significant_feature(data, labels):
    hc_adj = data[np.where(labels==1)]
    md_adj = data[np.where(labels==0)]
    count = 0

    p_view = np.zeros((hc_adj.shape[1:]))
    for view in range(p_view.shape[-1]):
        hc_val = hc_adj[:, view]
        mdd_val = md_adj[:, view]
        _, p1 = mannwhitneyu(hc_val,mdd_val)
        p_view[view] = p1
    data = data.reshape(data.shape[0], -1)
    p_view = p_view.reshape(-1)
    data = data[:, p_view<0.05]
    return data 


def get_metrics(y_true, y_pred):
    # tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

    # 明确指定labels参数
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])

    # 现在cm是一个2x2矩阵，即使数据只包含一个类别
    tn, fp, fn, tp = cm.ravel()

    accuracy = (tp + tn) / (tp + tn + fp + fn)
    sensitivity = tp / (tp + fn)
    specificity = tn / (tn + fp)
    f1 = f1_score(y_true, y_pred)

    return accuracy, sensitivity, specificity, f1

def print_md_table(model_name, set, metrics):
    print()
    print('| Model Name |   Set   |Accuracy | Sensitivity | Specificity | F1 Score |')
    print('|------------|----------|----------|-------------|-------------|----------|')
    print(f'| {model_name} | {set} |', end = '')
    for i in range(4):
        print(f" {metrics[i]:.4f} |", end = '')
    print()
    print(''*10)

def average_signal_and_reshape(data, numof_avgpoint=10):
    try:
        assert len(data.shape) == 3, "wrong data shape, it should be like (n_sample, n_channel, n_timepoint)"
        print("The data shape length is 3.")
    except AssertionError as error:
        print(error)
    totalpoint = data.shape[-1] // numof_avgpoint
    data = data[...,:int(totalpoint*numof_avgpoint)]
    data = np.reshape(data, (data.shape[0], 52, -1, 10))
    data = np.mean(data, axis=-1)
    data = np.reshape(data, (data.shape[0], -1))
    return data


In [2]:
# load the pretreatment data 
FC_diagnosis_data_path = '/Users/shanxiafeng/Documents/Project/Research/fnirs-prognosis/code/fnirs-treatment-response-prediction/allData/diagnosis/fc_data.npy'
FC_diagnosis_label_path = '/Users/shanxiafeng/Documents/Project/Research/fnirs-prognosis/code/fnirs-treatment-response-prediction/allData/diagnosis/label.npy'

FC_prognosis_data_path = '/Users/shanxiafeng/Documents/Project/Research/fnirs-prognosis/code/fnirs-treatment-response-prediction/allData/prognosis/DMFC/pre_treatment_hamd_reduction_50/data.npy'
FC_prognosis_label_path = '/Users/shanxiafeng/Documents/Project/Research/fnirs-prognosis/code/fnirs-treatment-response-prediction/allData/prognosis/DMFC/pre_treatment_hamd_reduction_50/label.npy'


FC_diagnosis_data = np.load(FC_diagnosis_data_path)
FC_diagnosis_label = np.load(FC_diagnosis_label_path)

FC_prognosis_data = np.load(FC_prognosis_data_path)
FC_prognosis_label = np.load(FC_prognosis_label_path)

print('FC diagnosis data shape:', FC_diagnosis_data.shape)
print('FC diagnosis label shape:', FC_diagnosis_label.shape)
print('FC prognosis data shape:', FC_prognosis_data.shape)
print('FC prognosis label shape:', FC_prognosis_label.shape)



FC diagnosis data shape: (140, 52, 52, 3)
FC diagnosis label shape: (140,)
FC prognosis data shape: (65, 52, 52, 3)
FC prognosis label shape: (65,)


In [3]:
data, label = FC_diagnosis_data, FC_diagnosis_label

def check_result(data, label):
    data = np.reshape(data, (data.shape[0], -1))

    print("Start finding the best seed")
    best_seed = 0
    best_f1 = 0 
    for _ in range(10):
        seed = int(time.time())
        print(f'current seed: {seed}')
        model = DecisionTreeClassifier()
        model.random_state = seed

        result,model = train_model_using_loocv(data, label, model)
        res_metrics = get_metrics(result[:, 1], result[:, 0])
        if res_metrics[-1] > best_f1: 
            best_f1 = res_metrics[-1]
            best_seed = seed
        print_md_table('Decision Tree', 'test', res_metrics)
    return best_seed

best_seed_for_diagnosis = check_result(FC_diagnosis_data, FC_diagnosis_label)
best_seed_for_prognosis = check_result(FC_prognosis_data, FC_prognosis_label)

Start finding the best seed
current seed: 1710231668

| Model Name |   Set   |Accuracy | Sensitivity | Specificity | F1 Score |
|------------|----------|----------|-------------|-------------|----------|
| Decision Tree | test | 0.4714 | 0.4429 | 0.5000 | 0.4559 |

current seed: 1710231720

| Model Name |   Set   |Accuracy | Sensitivity | Specificity | F1 Score |
|------------|----------|----------|-------------|-------------|----------|
| Decision Tree | test | 0.5214 | 0.5143 | 0.5286 | 0.5180 |

current seed: 1710231767

| Model Name |   Set   |Accuracy | Sensitivity | Specificity | F1 Score |
|------------|----------|----------|-------------|-------------|----------|
| Decision Tree | test | 0.5500 | 0.5571 | 0.5429 | 0.5532 |

current seed: 1710231815

| Model Name |   Set   |Accuracy | Sensitivity | Specificity | F1 Score |
|------------|----------|----------|-------------|-------------|----------|
| Decision Tree | test | 0.5000 | 0.5286 | 0.4714 | 0.5139 |

current seed: 171023

In [4]:
# extract time feature 
TF_prognosis_data = np.load('/Users/shanxiafeng/Documents/Project/Research/fnirs-prognosis/code/fnirs-treatment-response-prediction/allData/prognosis/DMFC/pre_treatment_hamd_reduction_50/data.npy')
print('TF_prognosis_data shape:', TF_prognosis_data.shape)

TF_prognosis_data shape: (65, 52, 52, 3)
