### **Setting**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install torch_geometric

Collecting torch_geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/63.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch_geometric
Successfully installed torch_geometric-2.6.1


In [None]:
import pandas as pd
import numpy as np

from torch_geometric.data import Data, Dataset
from torch_geometric import utils
import networkx as nx
import torch
import os
from tqdm import tqdm
from itertools import product

In [None]:
class EEGDataset(Dataset):
    def __init__(self, root, edge_dict, node_list = None, label_list = [0,1,2,3], threshold = -100, transform = None, pre_transform = None):
        """
        root = 데이터셋이 저장되어야 할 루트 디렉토리.
        상위폴더 속 raw = tabular data 저장되어 있음.
        하위폴더 processed = EEGDataset 실행시 자동 생성. 이미 생성되어 있을 경우 process 함수 자동 생략 및 빠르게 실행됨.
        """
        # 데이터셋의 루트 디렉토리를 지정
        self.root = root
        # 사용할 엣지 리스트 입력
        self.edge_dict = edge_dict
        # 사용할 노드 리스트 입력
        self.node_list = node_list
        # 사용할 라벨 리스트 입력
        self.label_list = label_list
        # threshold 입력
        self.threshold = threshold

        # 라벨과 파일 경로 매핑을 저장할 리스트
        self.data_info = self._load_data_info()

        # 부모 클래스 초기화
        super(EEGDataset, self).__init__(root, transform, pre_transform)

    def _load_data_info(self):
        """
        raw_dir에 있는 라벨별 폴더를 탐색하여 파일 경로와 라벨 정보를 매핑.
        """
        data_info = []

        # raw_dir에 대한 경로 설정
        raw_dir = os.path.join(os.path.dirname(self.root), "raw_split")

        # 라벨별 폴더에서 파일 찾기
        for split_folder in os.listdir(raw_dir):
            split_path = os.path.join(raw_dir, split_folder)
            for label_folder in os.listdir(split_path):
                if int(label_folder[0]) in self.label_list:
                    label_path = os.path.join(split_path, label_folder)
                    if os.path.isdir(label_path):
                        # 각 라벨 폴더의 파일을 살펴봄
                        for filename in os.listdir(label_path):
                            file_path = os.path.join(label_path, filename)
                            if os.path.isfile(file_path):
                                # 파일 경로와 라벨 매핑
                                data_info.append((file_path, label_folder, split_folder))

        return data_info

    @property
    def raw_file_names(self):
        # raw_dir 안에 있는 모든 txt 파일 경로를 반환
        return [info[0] for info in self.data_info]

    @property
    def processed_file_names(self):
        # data_info에 있는 각 파일 경로에서 파일 이름을 추출하고, 확장자를 제거합니다.
        processed_file_names = []
        for file_path, _, _ in self.data_info:
            # 파일 경로에서 파일 이름 추출
            filename = os.path.basename(file_path)
            # 확장자 제거
            base_filename, _ = os.path.splitext(filename)
            # 처리된 파일 이름 생성
            processed_file_names.append(f"{base_filename}.pt")

        return processed_file_names

    def process(self):
        # 데이터셋 처리
        for idx, (file_path, label_folder, split) in enumerate(tqdm(self.data_info)):
            # 파일 읽기
            df = self._read_data(file_path, label_folder)

            # 그래프 데이터 변환 정보
            node_features = self._get_node_features(df)
            edge_index = self._get_edge_indices(df)
            edge_features = self._get_edge_features(df)
            label = self._get_labels(label_folder)

            # 그래프 데이터 생성
            data = Data(x=node_features, edge_index=edge_index, edge_attr=edge_features, y=label)

            # test data 여부
            data.test_mask = self._get_test_mask(split)
            # 환자 ID 정보
            base_filename = os.path.splitext(os.path.basename(file_path))[0]
            data.ID = base_filename

            # Data 객체 저장
            torch.save(data, os.path.join(self.processed_dir, f"{base_filename}.pt"))

    def _read_data(self, file_path, label_folder):
        # 파일을 탭으로 구분된 값으로 읽기
        df = pd.read_csv(file_path, sep="\t")
        # 데이터프레임의 일부 행과 열 추출
        df = df.iloc[1:3829, 0:41]
        # 열 이름 설정
        df.columns = ['brain_region'] + [f"freq{i}" for i in range(1, 41)]
        # 엣지 리스트에 해당하는 정보만 사용
        df = df.loc[df['brain_region'].isin(self.edge_dict[int(label_folder[0])])]
        # wave 변환
        df['delta'] = df[[f"freq{i}" for i in range(1, 4)]].mean(axis=1)
        df['theta'] = df[[f"freq{i}" for i in range(4, 9)]].mean(axis=1)
        df['alpha'] = df[[f"freq{i}" for i in range(9, 14)]].mean(axis=1)
        df['beta'] = df[[f"freq{i}" for i in range(14, 31)]].mean(axis=1)
        df['gamma'] = df[[f"freq{i}" for i in range(14, 41)]].mean(axis=1)
        df = df[['brain_region', 'delta', 'theta', 'alpha', 'beta', 'gamma']]

        # threshold 처리
        waves = ['delta', 'theta', 'alpha', 'beta', 'gamma']
        df[waves] = df[waves].map(lambda x: 0 if x < self.threshold else x)
        df = df[~(df[waves].sum(axis=1) == 0)]

        return df

    def _get_node_features(self, df):
        # 노드 리스트가 지정된 경우, 노드 리스트에 해당하는 노드 사용
        if self.node_list is not None:
            unique_brain_regions = self.node_list
        # 노드 리스트가 지정되지 않은 경우, 엣지에 사용되는 노드 사용
        else:
            unique_brain_regions = pd.unique(df['brain_region'].str.split(' ', expand=True).stack())
        # 노드의 숫자 변환 정보 딕셔너리에 저장
        self.brain_region_to_index = {region: idx for idx, region in enumerate(unique_brain_regions)}
        # node 특성 1로 채우기
        node_features = np.ones(shape=(len(unique_brain_regions),1))

        return torch.tensor(node_features, dtype=torch.float)

    def _get_edge_indices(self, df):
        # edge index 추출
        edge_index = df['brain_region'].str.split(' ', expand=True).values.transpose()
        # 엣지를 이루는 노드 정보를 숫자로 변환
        numerical_edge_index = np.vectorize(self.brain_region_to_index.get)(edge_index)

        return torch.tensor(numerical_edge_index, dtype=torch.long)

    def _get_edge_features(self, df):
        edge_features = df.drop(columns=['brain_region']).values

        return torch.tensor(edge_features, dtype=torch.float)

    def _get_labels(self, label_folder):
        if len(self.label_list) == 5:
            label_num = np.asarray([0 if int(label_folder[0])==0 else 1])
        else:
            label_map = {label: idx for idx, label in enumerate(self.label_list)}
            label_num = np.asarray([label_map[int(label_folder[0])]])

        return torch.tensor(label_num, dtype=torch.long)

    def _get_test_mask(self, split):
        test_mask = 1 if split == 'test' else 0
        test_mask = np.asarray([test_mask])

        return torch.tensor(test_mask, dtype=torch.long)

    def len(self):
        # 데이터셋의 길이 반환

        return len(self.data_info)

    def get(self, idx):
        # 인덱스에 해당하는 데이터를 로드하여 반환
        file_path, _, _ = self.data_info[idx]
        base_filename = os.path.splitext(os.path.basename(file_path))[0]

        return torch.load(os.path.join(self.processed_dir, f"{base_filename}.pt"))

---

In [None]:
label_numlist_dict = {'건강조울':[0, 1], '건강우울':[0, 2], '건강조현':[0, 3], '조울우울':[1, 2], '조울조현':[1, 3], '우울조현':[2, 3], '건강질환':[0, 1, 2, 3, 4],
                      '건강조울우울':[0, 1, 2], '건강조울조현':[0, 1, 3], '건강우울조현':[0, 2, 3], '조울우울조현':[1, 2, 3], '건강조울우울조현':[0, 1, 2, 3]}

### **hotelling t-square 방법**

In [None]:
hotelling_t2 = {'label_name': ['건강조울', '건강우울', '건강조현', '조울우울', '조울조현', '우울조현', '건강질환'],
                'correction_type': ['bonf_0.05', 'bonf_0.01', 'bh_0.05', 'bh_0.01', 'by_0.05', 'by_0.01']}
combinations = list(product(hotelling_t2["label_name"], hotelling_t2["correction_type"]))

In [None]:
for label_name, correction_type in combinations:
    edge_df = pd.read_csv(f"/content/drive/Othercomputers/내 노트북/2024-1/EEG_GNN/data/edge_selection/{label_name}.csv")
    edge_list = edge_df.loc[edge_df[correction_type],'brain_region'].tolist()

    dataset = EEGDataset(root = f"/content/drive/Othercomputers/내 노트북/2024-1/EEG_GNN/data/{label_name}_{correction_type}",
                         edge_dict = {num: edge_list for num in label_numlist_dict[label_name]},
                         label_list = label_numlist_dict[label_name])

### **threshold 방법**

In [None]:
dmn_path = "/content/drive/Othercomputers/내 노트북/2024-1/EEG_GNN/data/edge_selection/dmn.txt"
dmn = pd.read_csv(dmn_path, sep="\t")
dmn_list = dmn.iloc[1:,0].unique()[0:190]

In [None]:
threshold_dict = {'label_name': ['건강조울', '건강우울', '건강조현', '조울우울', '조울조현', '우울조현', '건강질환',
                                 '건강조울우울', '건강조울조현', '건강우울조현', '조울우울조현', '건강조울우울조현'],
                  'edge_threshold': ['thres40', 'thres45', 'thres50']}
combinations = list(product(threshold_dict["label_name"], threshold_dict["edge_threshold"]))

In [None]:
for label_name, edge_threshold in combinations:
    dataset = EEGDataset(root = f"/content/drive/Othercomputers/내 노트북/2024-1/EEG_GNN/data/{label_name}_{edge_threshold}",
                         edge_dict = {num: dmn_list for num in label_numlist_dict[label_name]},
                         label_list = label_numlist_dict[label_name],
                         threshold = int(edge_threshold[5:]))

### **dmn 모두 사용**

In [None]:
for label_name in threhold_dict["label_name"]:
    dataset = EEGDataset(root = f"/content/drive/Othercomputers/내 노트북/2024-1/EEG_GNN/data/{label_name}_dmn",
                         edge_dict = {num: dmn_list for num in label_numlist_dict[label_name]},
                         label_list = label_numlist_dict[label_name])