In [24]:
import pickle

from torch.utils.data import TensorDataset, DataLoader, WeightedRandomSampler
import os
import torch

def load_dataset(data_type,loading='CN_MCI'):
    load_path = f'./preprocess/data/2class/{loading}'
    state_dict = {}
    for i, state in enumerate(loading.split('_')):
        state_dict[loading.split('_')[i]] = torch.tensor(i)
     
    final_path = f'{load_path}/{data_type}_data/5folds/genotype_only'
    
    # 만약 5개의 fold가 있다면, dataset_folds_X는 리스트 안의 5개의 딕셔너리로 표현될 것이다.
    # dataset_folds_X의 각각의 딕셔너리는 {SNPS텐서, DTI텐서, T1텐서, PET텐서}로 표현된다.
    
    # dataset_folds_Y의 각각의 리스트는 [X의 각 행(데이터)에 해당하는 레이블을 'MCI', 'AD', 'CN' 중의 문자열] 로 가지고 있을 것이다.
    
    dataset_folds = []
     
    num_folds = len(os.listdir(final_path)) // 2  # train/test 각각 존재하므로 fold 수 계산
    for fold in range(1, num_folds + 1):
        with open(f'{final_path}/train_fold={fold}.pickle', 'rb') as train_data_file:
            train_data = pickle.load(train_data_file)
        
        with open(f'{final_path}/test_fold={fold}.pickle', 'rb') as test_data_file:
            test_data = pickle.load(test_data_file)
        
        snps_train, t1_train, pet_train, y_train = train_data
        snps_test, t1_test, pet_test, y_test = test_data
        
        # print(snps_train.shape, t1_train.shape, pet_train.shape, len(y_train))
        # print(snps_test.shape, t1_test.shape, pet_test.shape, len(y_test))
        print((y_test))
        
        dataset_folds.append({
            'train': {'snps': snps_train,  't1': t1_train ,'pet': pet_train, 'y': torch.stack([state_dict[s]for s in y_train])},
            'test': {'snps': snps_test,  't1': t1_test, 'pet': pet_test, 'y': torch.stack([state_dict[s]for s in y_test])}
        })

    return dataset_folds, snps_train.shape[1]

def build_data_loader(data_fold):
    # 3개 모달리티 사용
    train_X    = torch.cat([data_fold['train'][' snps'],data_fold['train']['t1'],data_fold['train']['pet']], dim=1)
    test_X     = torch.cat([data_fold['test']['snps'], data_fold['test']['t1'],data_fold['test']['pet']], dim=1)

    # 2개 모달리티 사용 (영상만) 필요시 주석 해제하여 모달리티별로 주석설정하여 사용
    # train_X    = torch.cat([data_fold['train']['t1'],data_fold['train']['pet']], dim=1)
    # test_X     = torch.cat([data_fold['test']['t1'],data_fold['test']['pet']], dim=1)

    # 1개 모달리티 사용 snps, t1, pet 중 골라서 기입 후 사용 
    # train_X    = data_fold['train']['snps']
    # test_X     = data_fold['test']['snps']

    train_y    = data_fold['train'] ['y'].float()
    test_y     = data_fold['test'] ['y'].float()

    # Data Distribution 확인용 
    #print(f"Train Data label distribution : {torch.unique(train_y, return_counts=True)}")
    #print(f"Test Data label distribution : {torch.unique(test_y, return_counts=True)}")

    data_train = TensorDataset(train_X, train_y)
    data_test = TensorDataset(test_X, test_y)

    
    return train_X.numpy(), train_y.numpy(), test_X.numpy(), test_y.numpy()

In [25]:
data_type = 'trr'

In [26]:
import shap
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, roc_auc_score
import numpy as np
from multiprocessing import Pool

# SHAP 계산 함수 (샘플별 처리)
def compute_shap_for_sample(args):
    explainer, sample = args
    return explainer.shap_values(sample.reshape(1, -1))

# SHAP 계산 및 시각화 코드
for loading in ['MCI_AD', 'CN_MCI', 'CN_AD']:
    print(f"----TASK : {loading}----")
    dataset_folds, snp_dim = load_dataset(data_type,loading)  # 데이터 로드 함수
    print("Start Cross Validation")
    
    acc_list = []
    auc_list = []
    shap_values_all_folds = []  # 모든 fold의 SHAP 값을 저장
    test_X_all_folds = []  # 모든 fold의 테스트 데이터를 저장

    for i in range(5):
        train_X, train_y, test_X, test_y = build_data_loader(dataset_folds[i])
        print(train_X.shape)
        # SVM 모델 생성 및 학습
        model = SVC(kernel='linear', C=1.0, probability=True)
        model.fit(train_X, train_y)

        #SHAP Explainer 생성
        explainer = shap.LinearExplainer(model, train_X)

        # 멀티프로세싱으로 SHAP 값 계산
        print(f"Calculating SHAP values for Fold {i+1} with multiprocessing...")
    
        shap_values = explainer.shap_values(test_X)
        shap_values_all_folds.append(shap_values)
        test_X_all_folds.append(test_X)

        # 성능 평가
        y_pred = model.predict(test_X)
        y_pred_proba = model.predict_proba(test_X)[:, 1]
        acc_list.append(accuracy_score(test_y, y_pred))
        auc_list.append(roc_auc_score(test_y, y_pred_proba))
    
    # Fold별 평균 성능 출력
    print(f"Mean Accuracy: {np.mean(acc_list):.4f} +- {np.std(acc_list):.4f}")
    print(f"Mean ROC AUC: {np.mean(auc_list):.4f} +- {np.std(auc_list):.4f}")

    # 모든 fold의 SHAP 값 병합
    combined_shap_values = np.vstack(shap_values_all_folds)  # SHAP 값을 병합
    combined_test_X = np.vstack(test_X_all_folds)  # 테스트 데이터 병합

    plt.figure() 
    # Summary Plot (모든 fold 결과를 사용)
    shap.summary_plot(
        combined_shap_values,
        combined_test_X,
        feature_names=[f'Feature {i}' for i in range(train_X.shape[1])],show=False,
        max_display=200  # 상위 100개 feature만 표시
    )
    plt.savefig(f'shap_summary_{loading}.png', dpi=300, bbox_inches='tight')  # 해상도 300dpi
    plt.close()

----TASK : MCI_AD----
[nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan]


KeyError: nan