<a href="https://colab.research.google.com/github/starryesh22/Google_Colab/blob/Heartbeat/Baselines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import argparse
import os
import warnings

from lightgbm import LGBMClassifier
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.multiclass import OneVsRestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import f1_score
from tqdm import tqdm
import wfdb

from utils import split_data, find_optimal_threshold
from expert_features import extract_features

warnings.filterwarnings('ignore', category=FutureWarning)



''' 코드는 필요한 패키지와 모듈을 임포트하는 부분입니다. 각각의 임포트된 항목은 다음과 같습니다:

argparse: 명령행 인수를 파싱하기 위한 파이썬의 기본 라이브러리인 argparse를 임포트합니다.

os: 운영 체제와 상호 작용하기 위한 os 모듈을 임포트합니다.

warnings: 경고 메시지를 처리하기 위한 warnings 모듈을 임포트합니다.

lightgbm.LGBMClassifier: LightGBM 분류기를 임포트합니다. LightGBM은 부스팅 기반의 트리 알고리즘으로, 빠른 속도와 높은 성능을 제공합니다.

numpy: 수치 계산을 위한 NumPy 패키지를 임포트합니다.

pandas: 데이터 조작 및 분석을 위한 Pandas 패키지를 임포트합니다.

sklearn.linear_model.LogisticRegression: 로지스틱 회귀 모델을 임포트합니다.

sklearn.ensemble.RandomForestClassifier: 랜덤 포레스트 분류기를 임포트합니다.

sklearn.multiclass.OneVsRestClassifier: 다중 클래스 분류를 위한 One-vs-Rest 분류기를 임포트합니다.

sklearn.neural_network.MLPClassifier: 다층 퍼셉트론 분류기를 임포트합니다.

sklearn.metrics.f1_score: F1 스코어를 계산하기 위한 함수를 임포트합니다.

tqdm: 반복 작업의 진행 상황을 표시하기 위한 진행 표시줄을 제공하는 tqdm 패키지를 임포트합니다.

wfdb: PhysioNet의 WFDB 라이브러리를 임포트합니다. 이 라이브러리는 ECG 데이터를 읽고 처리하는 데 사용됩니다.

utils.split_data: 데이터를 훈련 세트와 검증 세트로 분할하는 함수를 임포트합니다.

utils.find_optimal_threshold: 최적의 임계값을 찾기 위한 함수를 임포트합니다.

expert_features.extract_features: 전문가 기능을 추출하기 위한 함수를 임포트합니다. '''



In [None]:

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--data-dir', type=str, default='data/CPSC', help='Data directory')
    parser.add_argument('--classifier', type=str, default='all', help='Classifier to use: LR, RF, LGB, or MLP')
    parser.add_argument('--seed', type=int, default=42, help='Seed to split data')
    return parser.parse_args()

'''

위의 코드는 명령행 인수를 파싱하는 함수인 parse_args를 정의합니다. 이 함수는 argparse.ArgumentParser를 사용하여 인수를 구성하고, 파싱된 인수를 반환합니다.

parser = argparse.ArgumentParser(): ArgumentParser 객체를 생성하여 파서를 초기화합니다.

parser.add_argument('--data-dir', type=str, default='data/CPSC', help='Data directory'): --data-dir 옵션을 추가합니다. 이 옵션은 데이터 디렉토리 경로를 지정합니다. type=str은 인수의 타입이 문자열임을 나타내고, default='data/CPSC'는 기본값으로 'data/CPSC'를 사용한다는 것을 의미합니다. help는 도움말 메시지를 지정하는 인수로, 사용자가 어떤 값을 입력해야 하는지 설명합니다.

parser.add_argument('--classifier', type=str, default='all', help='Classifier to use: LR, RF, LGB, or MLP'): --classifier 옵션을 추가합니다. 이 옵션은 사용할 분류기를 지정합니다. type=str은 인수의 타입이 문자열임을 나타내고, default='all'은 기본값으로 'all'을 사용한다는 것을 의미합니다. help는 도움말 메시지를 지정하는 인수로, 사용자가 어떤 값을 입력해야 하는지 설명합니다.

parser.add_argument('--seed', type=int, default=42, help='Seed to split data'): --seed 옵션을 추가합니다. 이 옵션은 데이터를 분할할 때 사용할 시드 값을 지정합니다. type=int는 인수의 타입이 정수임을 나타내고, default=42는 기본값으로 42를 사용한다는 것을 의미합니다. help는 도움말 메시지를 지정하는 인수로, 사용자가 어떤 값을 입력해야 하는지 설명합니다.

return parser.parse_args(): 구성된 파서를 사용하여 명령행 인수를 파싱하고, 파싱된 인수를 반환합니다. 이는 프로그램에서 사용자가 지정한 인수 값을 얻는 데 사용됩니다.


'''

    

In [None]:


def generate_features_csv(features_csv, data_dir, patient_ids):
    print('Generating expert features...')
    ecg_features = []
    for patient_id in tqdm(patient_ids):
        ecg_data, _ = wfdb.rdsamp(os.path.join(data_dir, patient_id))
        ecg_features.append(extract_features(ecg_data))
    df = pd.DataFrame(ecg_features, index=patient_ids)
    df.index.name = 'patient_id'
    df.to_csv(features_csv)
    return df


''' 


위의 코드는 전문가 기능을 생성하여 CSV 파일로 저장하는 generate_features_csv 함수를 정의합니다.

print('Generating expert features...'): 'Generating expert features...'라는 메시지를 출력합니다.

ecg_features = []: 전문가 기능을 저장하기 위한 빈 리스트 ecg_features를 생성합니다.

for patient_id in tqdm(patient_ids):: patient_ids에 대해 반복합니다. tqdm은 진행 상황을 표시하는 진행 표시줄입니다.

ecg_data, _ = wfdb.rdsamp(os.path.join(data_dir, patient_id)): WFDB를 사용하여 ECG 데이터를 읽어옵니다. data_dir과 patient_id를 결합하여 데이터 경로를 만들고, wfdb.rdsamp 함수를 사용하여 데이터를 읽어옵니다. 읽은 ECG 데이터는 ecg_data에 저장됩니다.

ecg_features.append(extract_features(ecg_data)): ecg_data에서 전문가 기능을 추출하여 ecg_features 리스트에 추가합니다. extract_features 함수는 ECG 데이터로부터 전문가 기능을 추출하는 함수입니다.

df = pd.DataFrame(ecg_features, index=patient_ids): ecg_features 리스트와 patient_ids를 사용하여 데이터프레임 df를 생성합니다. ecg_features의 각 항목은 행이 되고, patient_ids가 인덱스가 됩니다.

df.index.name = 'patient_id': 데이터프레임의 인덱스에 이름을 지정합니다. 이름은 'patient_id'로 지정됩니다.

df.to_csv(features_csv): 데이터프레임을 CSV 파일로 저장합니다. 파일 경로는 features_csv로 지정됩니다.

return df: 생성된 데이터프레임 df를 반환합니다.


'''



In [None]:

if __name__ == "__main__":
    classes = ['SNR', 'AF', 'IAVB', 'LBBB', 'RBBB', 'PAC', 'PVC', 'STD', 'STE']
    args = parse_args()
    data_dir = args.data_dir
    classifier = args.classifier
    features_csv = os.path.join(data_dir, 'features.csv')
    labels_csv = os.path.join(data_dir, 'labels.csv')

    df_labels = pd.read_csv(labels_csv)
    patient_ids = df_labels['patient_id'].tolist()
    if not os.path.exists(features_csv):
        df_X = generate_features_csv(features_csv, data_dir, patient_ids)
    else:
        df_X = pd.read_csv(features_csv)
    df_X = df_X.merge(df_labels[['patient_id', 'fold']], on='patient_id')
    
    train_folds, val_folds, test_folds = split_data(seed=args.seed)
    feature_cols = df_X.columns[1:-1] # remove patient id and fold

    X_train = df_X[df_X['fold'].isin(train_folds)][feature_cols].to_numpy()
    X_val = df_X[df_X['fold'].isin(val_folds)][feature_cols].to_numpy()
    X_test = df_X[df_X['fold'].isin(test_folds)][feature_cols].to_numpy()

    y_train = df_labels[df_labels['fold'].isin(train_folds)][classes].to_numpy()
    y_val = df_labels[df_labels['fold'].isin(val_folds)][classes].to_numpy()
    y_test = df_labels[df_labels['fold'].isin(test_folds)][classes].to_numpy()


    ''' 



    위의 코드는 main 함수를 정의하여 프로그램의 주 실행 부분을 나타냅니다. 코드의 시작부분에 if __name__ == "__main__":이 있으므로, 이 코드는 직접 실행될 때만 실행되고 다른 모듈에서 임포트될 때는 실행되지 않습니다.

classes = ['SNR', 'AF', 'IAVB', 'LBBB', 'RBBB', 'PAC', 'PVC', 'STD', 'STE']: 클래스 리스트를 정의합니다. 이 클래스는 ECG 데이터의 레이블로 사용됩니다.

args = parse_args(): parse_args 함수를 사용하여 명령행 인수를 파싱하고, 결과를 args 변수에 할당합니다.

data_dir = args.data_dir: 데이터 디렉토리 경로를 args.data_dir에서 가져와 data_dir 변수에 할당합니다.

classifier = args.classifier: 사용할 분류기를 args.classifier에서 가져와 classifier 변수에 할당합니다.

features_csv = os.path.join(data_dir, 'features.csv'): data_dir과 'features.csv'를 결합하여 전문가 기능 CSV 파일의 경로를 만듭니다.

labels_csv = os.path.join(data_dir, 'labels.csv'): data_dir과 'labels.csv'를 결합하여 레이블 CSV 파일의 경로를 만듭니다.

df_labels = pd.read_csv(labels_csv): 레이블 CSV 파일을 읽어 데이터프레임 df_labels에 저장합니다.

patient_ids = df_labels['patient_id'].tolist(): 데이터프레임 df_labels에서 'patient_id' 열을 가져와 리스트로 변환하여 patient_ids 변수에 할당합니다.

if not os.path.exists(features_csv):: 전문가 기능 CSV 파일이 존재하지 않는 경우를 확인합니다.

df_X = generate_features_csv(features_csv, data_dir, patient_ids): generate_features_csv 함수를 사용하여 전문가 기능을 생성하고, 생성된 데이터프레임을 df_X 변수에 할당합니다.

else:: 전문가 기능 CSV 파일이 이미 존재하는 경우를 처리합니다.

df_X = pd.read_csv(features_csv): 전문가 기능 CSV 파일을 읽어 데이터프레임 df_X에 저장합니다.

df_X = df_X.merge(df_labels[['patient_id', 'fold']], on='patient_id'): 데이터프레임 df_X와 df_labels의 'patient_id'와 'fold' 열을 병합합니다.

train_folds, val_folds, test_folds = split_data(seed=args.seed): split_data 함수를 사용하여 데이터를 훈련 세트, 검증 세트, 테스트 세트로 분할합니다. 시드 값은 args.seed에서 가져옵니다.

feature_cols = df_X.columns[1:-1]: 데이터프레임 df_X의 두 번째 열부터 마지막에서 두 번째 열까지를 선택하여 특성 열로 지정합니다. 첫 번째 열은 환자 ID이고, 마지막에서 두 번째 열은 폴드 정보입니다.

X_train = df_X[df_X['fold'].isin(train_folds)][feature_cols].to_numpy(): 훈련 세트에 해당하는 행만 선택하여 특성 열을 넘파이 배열로 변환하여 X_train 변수에 할당합니다.

X_val = df_X[df_X['fold'].isin(val_folds)][feature_cols].to_numpy(): 검증 세트에 해당하는 행만 선택하여 특성 열을 넘파이 배열로 변환하여 X_val 변수에 할당합니다.

X_test = df_X[df_X['fold'].isin(test_folds)][feature_cols].to_numpy(): 테스트 세트에 해당하는 행만 선택하여 특성 열을 넘파이 배열로 변환하여 X_test 변수에 할당합니다.

y_train = df_labels[df_labels['fold'].isin(train_folds)][classes].to_numpy(): 훈련 세트에 해당하는 행만 선택하여 클래스 열을 넘파이 배열로 변환하여 y_train 변수에 할당합니다.

y_val = df_labels[df_labels['fold'].isin(val_folds)][classes].to_numpy(): 검증 세트에 해당하는 행만 선택하여 클래스 열을 넘파이 배열로 변환하여 y_val 변수에 할당합니다.

y_test = df_labels[df_labels['fold'].isin(test_folds)][classes].to_numpy(): 테스트 세트에 해당하는 행만 선택하여 클래스 열을 넘파이 배열로 변환하여 y_test 변수에 할당합니다.

이후 코드에서는 훈련, 검증, 테스트 데이터를 사용하여 분류기를 학습하고 평가하는 과정이 진행될 것입니다.


'''



In [None]:


  if classifier == 'all':
        classifiers = ['LR', 'RF', 'LGB', 'MLP']
    else:
        classifiers = [classifier]

    for classifier in classifiers:
        # tune parameters
        if classifier == 'LR':
            model = LogisticRegression(solver='lbfgs', max_iter=1000)
        elif classifier == 'RF':
            model = RandomForestClassifier(n_estimators=300, max_depth=10)
        elif classifier == 'LGB':
            model = LGBMClassifier(n_estimators=100)
        else:
            model = MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=500)
        if classifier != 'MLP':
            model = OneVsRestClassifier(model)

        print(f'Start training {classifier}...')
        model.fit(X_train, y_train)
        
        y_val_scores = model.predict_proba(X_val)
        y_test_scores = model.predict_proba(X_test)
        
        f1s = []
        thresholds = []
        print('Finding optimal thresholds on validation dataset...')


''' 
위의 코드는 분류기를 선택하고 해당 분류기를 사용하여 훈련 및 평가하는 부분입니다.

if classifier == 'all':: 분류기가 'all'인 경우, 모든 분류기를 사용하도록 설정합니다. 그렇지 않으면 사용자가 선택한 분류기를 사용합니다.

for classifier in classifiers:: 선택된 분류기마다 반복합니다.

분류기의 매개변수를 조정합니다. 각 분류기에 대해 매개변수를 설정합니다.

'LR': 로지스틱 회귀 분류기
'RF': 랜덤 포레스트 분류기
'LGB': LightGBM 분류기
'MLP': 다층 퍼셉트론 분류기
model.fit(X_train, y_train): 훈련 데이터를 사용하여 분류기 모델을 학습합니다.

y_val_scores = model.predict_proba(X_val): 검증 데이터에 대한 예측 확률을 계산합니다.

y_test_scores = model.predict_proba(X_test): 테스트 데이터에 대한 예측 확률을 계산합니다.

f1s = []와 thresholds = []: F1 점수와 임계값을 저장할 리스트를 초기화합니다.

'Finding optimal thresholds on validation dataset...'을 출력합니다.

이 이후에는 검증 데이터를 사용하여 최적의 임계값을 찾는 과정이 진행될 것입니다.

'''






In [None]:
  
        for i in range(len(classes)):
            # find optimal threshold on validation dataset
            y_val_score = y_val_scores[:, i]
            threshold = find_optimal_threshold(y_val[:, i], y_val_score)
            # apply optimal threshold to test dataset
            y_test_score = y_test_scores[:, i]
            y_test_pred = y_test_score > threshold
            f1 = f1_score(y_test[:, i], y_test_pred)
            thresholds.append(threshold)
            f1s.append(f1)
        np.set_printoptions(precision=3)
        print(f'{classifier} F1s:', f1s)
        print('Avg F1:', np.mean(f1s))



        '''

       위의 코드는 검증 데이터를 사용하여 최적의 임계값을 찾고, 이 임계값을 테스트 데이터에 적용하여 F1 점수를 계산하는 부분입니다.

for i in range(len(classes)):

클래스 개수만큼 반복합니다.
y_val_score = y_val_scores[:, i]: i번째 클래스에 대한 검증 데이터의 예측 확률을 가져옵니다.

threshold = find_optimal_threshold(y_val[:, i], y_val_score): 검증 데이터의 실제 값과 예측 확률을 사용하여 최적의 임계값을 찾습니다.

y_test_score = y_test_scores[:, i]: i번째 클래스에 대한 테스트 데이터의 예측 확률을 가져옵니다.

y_test_pred = y_test_score > threshold: 최적의 임계값을 사용하여 테스트 데이터에 대한 예측을 수행합니다.

f1 = f1_score(y_test[:, i], y_test_pred): 실제 값과 예측값을 사용하여 F1 점수를 계산합니다.

thresholds.append(threshold)와 f1s.append(f1): 각 클래스에 대한 최적의 임계값과 F1 점수를 리스트에 추가합니다.

np.set_printoptions(precision=3): 출력할 숫자의 소수점 자릿수를 설정합니다.

print(f'{classifier} F1s:', f1s): 분류기 이름과 F1 점수 리스트를 출력합니다.

print('Avg F1:', np.mean(f1s)): 평균 F1 점수를 출력합니다.

이 코드는 분류기별로 각 클래스에 대한 최적의 임계값을 찾고, 테스트 데이터에 적용하여 F1 점수를 계산하여 출력합니다.


'''




