In [1]:
## 사용 라이브러리 호출
import pandas as pd
import numpy as np
import random 
from urllib.parse import quote, unquote
from datetime import timedelta
from sklearn.decomposition import PCA
from scipy.fft import fft

## 모델 사용 라이브러리 
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm.notebook import trange
from sklearn.metrics import f1_score, classification_report

## 모델 학습 결과 경로 설정 
import os
os.makedirs('./result', exist_ok=True)

## Cuda 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

## 랜덤 시드 설정
def set_seed(seed_val):
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

# 시드 설정 
seed_val = 77
set_seed(seed_val)

cuda


## 사용 데이터 컬럼 선택

* tag name 이 많은 경우 tag name을 지정하는 것에 있어서 변수 설정이 다소 유연해짐
* tag name 은 순서대로 불러와짐

In [2]:
# tag name 출력 함수 
def show_column(URL):
    
    # Tag name 데이터 로드
    df = pd.read_csv(URL)
    
    # List 형식으로 변환
    df = df.values.reshape(-1)
    
    return df.tolist()

In [3]:
## tag name 출력 파라미터 설정
table = 'bci1'

NAME_URL = f'http://127.0.0.1:5654/db/tql/datahub/api/v1/get_tag_names.tql?table={table}'

## tag name list 생성 
tag_name = show_column(NAME_URL)

In [4]:
tag_name

['test-s-0',
 'test-s-1',
 'test-s-10',
 'test-s-11',
 'test-s-12',
 'test-s-13',
 'test-s-14',
 'test-s-15',
 'test-s-16',
 'test-s-17',
 'test-s-18',
 'test-s-19',
 'test-s-2',
 'test-s-20',
 'test-s-21',
 'test-s-22',
 'test-s-23',
 'test-s-24',
 'test-s-25',
 'test-s-26',
 'test-s-27',
 'test-s-28',
 'test-s-29',
 'test-s-3',
 'test-s-30',
 'test-s-31',
 'test-s-32',
 'test-s-33',
 'test-s-34',
 'test-s-35',
 'test-s-36',
 'test-s-37',
 'test-s-38',
 'test-s-39',
 'test-s-4',
 'test-s-40',
 'test-s-41',
 'test-s-42',
 'test-s-43',
 'test-s-44',
 'test-s-45',
 'test-s-46',
 'test-s-47',
 'test-s-48',
 'test-s-49',
 'test-s-5',
 'test-s-50',
 'test-s-51',
 'test-s-52',
 'test-s-53',
 'test-s-54',
 'test-s-55',
 'test-s-56',
 'test-s-57',
 'test-s-58',
 'test-s-59',
 'test-s-6',
 'test-s-60',
 'test-s-61',
 'test-s-62',
 'test-s-63',
 'test-s-7',
 'test-s-8',
 'test-s-9',
 'train-s-0',
 'train-s-1',
 'train-s-10',
 'train-s-11',
 'train-s-12',
 'train-s-13',
 'train-s-14',
 'train-s-1

## TAG Name format 변환 

* 위의 과정에서 BCI1 dataset의 모든 Tag Name 을 확인후 사용할 컬럼만 뽑아서 입력할 파라미터 형태로 변환

In [5]:
# 원하는 tag name 설정
# 여기서 tag name 은 컬럼을 의미
tags = tag_name[64:]

# 리스트의 각 항목을 작은따옴표로 감싸고, 쉼표로 구분
tags_= ",".join(f"'{tag}'" for tag in tags)

# 사용 tag name 확인
print(tags_)

'train-s-0','train-s-1','train-s-10','train-s-11','train-s-12','train-s-13','train-s-14','train-s-15','train-s-16','train-s-17','train-s-18','train-s-19','train-s-2','train-s-20','train-s-21','train-s-22','train-s-23','train-s-24','train-s-25','train-s-26','train-s-27','train-s-28','train-s-29','train-s-3','train-s-30','train-s-31','train-s-32','train-s-33','train-s-34','train-s-35','train-s-36','train-s-37','train-s-38','train-s-39','train-s-4','train-s-40','train-s-41','train-s-42','train-s-43','train-s-44','train-s-45','train-s-46','train-s-47','train-s-48','train-s-49','train-s-5','train-s-50','train-s-51','train-s-52','train-s-53','train-s-54','train-s-55','train-s-56','train-s-57','train-s-58','train-s-59','train-s-6','train-s-60','train-s-61','train-s-62','train-s-63','train-s-7','train-s-8','train-s-9','train-s-answer'


##  BCI1 Dataset 로드

* Tag Name 들을 사용하여 데이터 로드

In [6]:
def data_load(table, tag_name, name, start_time, end_time, timeformat):
    
    # 결과를 저장할 리스트
    result_dfs = []
    
    # 데이터 로드 
    df = pd.read_csv(f'http://127.0.0.1:5654/db/tql/datahub/api/v1/select-rawdata.tql?table={table}&name={name}&start={start_time}&end={end_time}&timeformat={timeformat}')

    # 같은 시간대 별 데이터로 전환
    df = df.pivot_table(index='TIME', columns='NAME', values='VALUE', aggfunc='first').reset_index()
    
    # train-s-answer 값 분리
    df_label = df.iloc[:, -1:].dropna()

    # train-s-answer 컬럼 제거
    df = df.iloc[:, :-1]

    for col_name in tag_name[64:-1]:

        # TIME 설정
        df['TIME'] = pd.to_datetime(df['TIME'], format='%Y-%m-%d %H:%M:%S.%f')

        # 3초 단위로 그룹화하여 데이터 개수 세기
        df_counts = df.groupby(df['TIME'].dt.floor('3S')).size().reset_index(name='count')

        # 데이터 개수가 동일한 그룹만 필터링
        most_common_count = df_counts['count'].mode()[0]
        filtered_df_counts = df_counts[df_counts['count'] == most_common_count]

        # 필터링된 시간값들을 리스트로 변환
        filtered_times = filtered_df_counts['TIME'].tolist()

        # 원본 데이터프레임에서 필터링된 시간값들만 선택
        filtered_data = df[df['TIME'].dt.floor('3S').isin(filtered_times)]

        # TIME을 기준으로 그룹화 (3초 단위로 반올림)
        filtered_data_ = filtered_data.copy()
        filtered_data_['TIME'] = filtered_data_['TIME'].dt.floor('3S')
        grouped = filtered_data_.groupby('TIME')[col_name].apply(list).reset_index()

        # 리스트를 개별 열로 나누기
        result_df = pd.DataFrame(grouped[col_name].tolist())

        result_dfs.append(result_df)  # 결과 추가
        
    # 결과를 저장할 리스트 초기화
    data_list = []
    k = 0

    for k in result_dfs:
        
        # array 형태로 변환
        data = k.values
        data_list.append(data)

    # 리스트를 NumPy 배열로 변환
    data_array = np.array(data_list)

    # 필요한 형태로 reshape
    # (데이터 개수, 64, 3000) 형태로 변환
    reshaped_array = np.transpose(data_array, (1, 0, 2)) 
    
    # train-s-answer 변경
    df_label.loc[df_label['train-s-answer'] == -1.0, 'train-s-answer'] = 0
    df_label['train-s-answer'] = df_label['train-s-answer'].astype(int)    

    return reshaped_array, df_label

In [7]:
# 데이터 시간 로드 함수
def time_data_load(table, name, start_time, end_time, timeformat):
    
    target = 'time'
    
    # 데이터 로드 
    df = pd.read_csv(f"http://127.0.0.1:5654/db/tql/datahub/api/v1/select-rawdata.tql?target={target}&table={table}&name={name}&start={start_time}&end={end_time}&timeformat={timeformat}")
    
    # resample을 위해 임의의 value 컬럼 생성
    df['value'] = 0
    
    # resample 진행
    df['time'] = pd.to_datetime(df['time'])
    df.set_index('time', inplace=True)
    df = df.resample('3S').mean()
    
    # 결측값 제거
    df = df.dropna()
    
    # 임의의 value 컬럼 제거
    df = df.drop(['value'], axis=1)
    
    return df

In [8]:
# 시간 변환 함수
# 시간 추가하려면 해당 과정 필요
# window_size : 데이터를 묶어서 수집되는 주기 
# step_size : 데이터의 간격 
def add_time(time_df, start_time, batch_size):
    
    # 몇개의 데이터를 로드해야 되는지 계산
    time = batch_size
    
    # 현재 시간의 인덱스 번호를 확인
    # 없는 경우는 맨처음 시간이 없는 경우이기 때문에 맨처음 인덱스로 지정함 
    try:
        index_now = time_df.index.get_loc(start_time)
    except KeyError:
        index_now = 0
    
    # 현재 시간 기준 배치 데이터의 마지막 시간 설정 
    end_time_ = str(time_df.index[index_now + time] + timedelta(seconds=1))
    
    # 다음 시작 시간의 인덱스 번호 설정
    index_next = index_now + time
    
    # 다음 시작 시간 설정
    next_start_time_ = str(time_df.index[index_next])
    
    # URL 인코딩
    start_time_ = quote(start_time)
    end_time_ = quote(end_time_)
    next_start_time_ = quote(next_start_time_)
    
    return start_time_, end_time_, next_start_time_, index_next

In [9]:
# hanning window 함수 설정 
def hanning_window(length):
    return 0.5 * (1 - np.cos(2 * np.pi * np.arange(length) / (length - 1)))

In [10]:
# FFT 변환 함수
def change_fft(sample_rate, data):
    # 신호의 총 샘플 수
    N = sample_rate
    
    # 각 채널에 대해 FFT 결과를 저장할 배열 초기화
    fft_results = np.zeros((data.shape[0], data.shape[1], N // 2 + 1), dtype=float)
    
    # 전체 데이터에 대해 FFT 적용
    for i in range(data.shape[0]):  # 각 샘플에 대해
        for j in range(data.shape[1]):  # 각 채널에 대해
            yf = fft(data[i, j], n=N)  # FFT 계산
            # FFT 결과의 절댓값을 계산하고 정규화 (유의미한 부분만)
            fft_results[i, j] = 2.0 / N * np.abs(yf[:N // 2 + 1])
    
    return fft_results

## 학습 모델 설정 

* ResNet 1d 기본 모델 사용

In [11]:
## ResNet 1d 모델 설정 
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(out_channels)
        
        # Identity mapping
        self.shortcut = nn.Sequential()
        if in_channels != out_channels:
            self.shortcut = nn.Conv1d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64 

        
        self.conv1 = nn.Conv1d(64, 64, kernel_size=7, stride=2, padding=3) 
        self.bn1 = nn.BatchNorm1d(64)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1])
        self.layer3 = self._make_layer(block, 256, layers[2])
        self.layer4 = self._make_layer(block, 512, layers[3])
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks):
        layers = []
        layers.append(block(self.in_channels, out_channels))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [12]:
# 모델 설정 파라미터
# 학습률 
learning_rate = 0.01

# 모델 초기화
model = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=2).to(device)

# 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 모델 구조 확인
print(model)

ResNet(
  (conv1): Conv1d(64, 64, kernel_size=(7,), stride=(2,), padding=(3,))
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
  )
  (layer2): Sequ

## 모델 학습

* 필요한 배치 크기의 데이터만 불러와서 학습하는 방법으로 진행

In [13]:
# 모델 학습 함수 설정
def train(table, tag_name, name, timeformat, model, start_time_train, end_time_train, start_time_valid, end_time_valid, batch_size_train, batch_size_valid, epochs, time_df_train, time_df_valid, pca, sample_rate):
    
    # 초기 train loss 설정
    train_loss = []
    train_acc = []
    
    # 베스트 f1 값 초기화
    best_f1= 0

    for epoch in epochs:
        
        # 학습 모드 설정 
        model.train()
        
        # 초기 loss 초기화
        running_loss = 0.0
        total_step = 0
        correct = 0
        total=0
        
        # 초기 시작 시간 설정
        start_time_ = start_time_train
        
        # 끝 시간 설정
        end_time_train = str(time_df_train.index[-1])
        
        # while 문을 통해 데이터 호출 
        while start_time_ < end_time_train:
            
            # 배치 크기에 따라 데이터 로드 
            start_time_, end_time_, next_start_time_, index_next= add_time(time_df_train, start_time_, batch_size_train)
        
            # 데이터 로드 
            data, label = data_load(table, tag_name, name, start_time_, end_time_, timeformat)
            
            # Hanning window 적용
            data = data * hanning_window(sample_rate)
            
            # FFT 변환
            data = change_fft(sample_rate, data)
            
            # PCA 적용
            # 2차원으로 변환
            data_ = data.reshape(-1, data.shape[2])
            data_ = pca.fit_transform(data_)
            # 다시 원래의 3차원 형태로 변환
            data = data_.reshape(data.shape[0], data.shape[1], -1)
            
            # 로드한 데이터가 비어 있을 경우 출력 
            if len(data) == 0:
                print("데이터가 없습니다.")
            
            # 배치 사이즈 만큼 데이터가 쌓이면 다음 배치로 이동
            if len(data) == batch_size_train:
                
                # 총 배치수 체크용  
                total_step = total_step + 1
                
                # 데이터를 numpy 배열로 변환
                input_data = np.array(data)
                input_target = np.array(label)[:-1]

                # 데이터를 Tensor로 변환
                input_data = torch.tensor(input_data, dtype=torch.float32).to(device).float()
                input_target = torch.tensor(input_target, dtype=torch.float32).to(device).long().squeeze()
                
                # 옵티마이저 최적화 
                optimizer.zero_grad()
                
                # 모델 입력
                outputs = model(input_data)
                
                # loss 계산
                loss = criterion(outputs, input_target)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            
                # label 예측 값 설정 
                _,pred = torch.max(outputs, dim=1)
                correct += torch.sum(pred==input_target).item()
                total += input_target.size(0)

                # 배치 리셋
                data = []
                
                # 다음 시작 시간 설정    
            start_time_ = unquote(next_start_time_)
            
            # 마지막 시간을 넘어 가져오는 것을 방지
            if index_next + batch_size_train >= len(time_df_train):
                break
            
        train_acc.append(100 * correct / total)
        train_loss.append(running_loss/total_step)
        print(f'\ntrain loss: {np.mean(train_loss)}, train acc: {(100 * correct / total):.4f}')
        
        # Epoch 마다 validation을 진행해서 가장 좋은 성능을 보이는 모델을 저장 
        with torch.no_grad():
            
            model.eval()
            
            # 초기화
            preds_v = []
            targets_v = []
                
            # 초기 시작 시간 설정
            start_time_v = start_time_valid
            
            # 끝 시간 설정
            end_time_valid = str(time_df_valid.index[-1])
            
            # while 문을 통해 데이터 호출 
            while start_time_v < end_time_valid:
                
                # 배치 크기에 따라 데이터 로드 
                start_time_v, end_time_v, next_start_time_v, index_next_v = add_time(time_df_valid, start_time_v, batch_size_valid)
                
                # 데이터 로드 
                data_v, label_v = data_load(table, tag_name, name, start_time_v, end_time_v, timeformat)
                
                # Hanning window 적용
                data_v = data_v * hanning_window(sample_rate)
                
                # FFT 변환
                data_v = change_fft(sample_rate, data_v)
                
                # PCA 적용
                # 2차원으로 변환
                data_ = data_v.reshape(-1, data_v.shape[2])
                data_ = pca.fit_transform(data_)
                # 다시 원래의 3차원 형태로 변환
                data_v = data_.reshape(data_v.shape[0], data_v.shape[1], -1)

                
                # 로드한 데이터가 비어 있을 경우 출력 
                if len(data_v) == 0:
                    print("데이터가 없습니다.")
                
                # 배치 사이즈 만큼 데이터가 쌓이면 다음 배치로 이동
                if len(data_v) == batch_size_valid:
                    
                    # 데이터를 numpy 배열로 변환
                    input_data_v = np.array(data_v)
                    input_target_v = np.array(label_v)[:-1]

                    # 데이터를 Tensor로 변환
                    input_data_v = torch.tensor(input_data_v, dtype=torch.float32).to(device).float()
                    input_target_v = torch.tensor(input_target_v, dtype=torch.float32).to(device).long().squeeze()
                    
                    # 모델 입력
                    outputs_v = model(input_data_v)
                    
                    # label 예측 값 설정 
                    _,pred_v = torch.max(outputs_v, dim=1)
                    target_v = input_target_v.view_as(pred_v)
    
                    preds_v.append(pred_v)
                    targets_v.append(target_v)
                    
                    # 배치 리셋
                    data_v = []
                    
                # 다음 시작 시간 설정    
                start_time_v = unquote(next_start_time_v)
                
                # 마지막 시간을 넘어 가져오는 것을 방지
                if index_next_v + batch_size_valid > len(time_df_valid):
                    break
                    
            # 모든 배치에서 수집된 예측과 라벨을 합침
            preds_v = torch.cat(preds_v).detach().cpu().numpy()
            targets_v = torch.cat(targets_v).detach().cpu().numpy()
            
            f1score = f1_score(targets_v, preds_v,  average='macro')
            if best_f1 < f1score:
                best_f1 = f1score
                # 베스트 모델 저장 
                with open("./result/BCI1_ResN et1d_New_Batch.txt", "a") as text_file:
                    print('epoch=====',epoch, file=text_file)
                    print(classification_report(targets_v, preds_v, digits=4), file=text_file)
                torch.save(model, f'./result/BCI1_ResNet1d_New_Batch.pt') 
            epochs.set_postfix_str(f"epoch = {epoch},  f1_score = {f1score}, best_f1 = {best_f1}")
                    
    return model

In [14]:
########################################### 학습 파라미터 설정 ################################################
# tag table 이름 설정
table = 'bci1'
# tag name 설정
name = quote(tags_, safe=":/")
# 시간 포멧 설정 
timeformat = quote('2006-01-02 15:04:05.000000')
# 학습 데이터 시작 시간 설정
start_time_train = '2024-01-01 00:00:00'
# 학습 데이터  끝 시간 설정
end_time_train = '2024-01-01 03:41:00'
# 배치 사이즈 설정
batch_size_train = 16
batch_size_valid = 16
# epoch 설정
epochs = trange(100, desc='training')
# sample rate 설정
sample_rate = 3000
# PCA 설정
# 95%의 분산을 설명하는 주성분 선택
pca = PCA(n_components=14)
# 학습 시간 리스트 로드 
time_df_train = time_data_load(table, name, quote(start_time_train), quote(end_time_train), timeformat)

########################################### 검증 파라미터 설정 ################################################
# 검증 데이터 시작 시간 설정
start_time_valid = '2024-01-01 03:42:00'
# 검증 데이터  끝 시간 설정
end_time_valid = '2024-01-01 04:09:00'
# 검증 시간 리스트 로드 
time_df_valid = time_data_load(table, name, quote(start_time_valid), quote(end_time_valid), timeformat)

######################################## 학습 진행 #############################################
train(table, tag_name, name, timeformat, model, start_time_train, end_time_train, start_time_valid, end_time_valid, batch_size_train, batch_size_valid, epochs, time_df_train, time_df_valid, pca, sample_rate)


training:   0%|          | 0/100 [00:00<?, ?it/s]

  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass



train loss: 1.2080353131661048, train acc: 47.5962

train loss: 1.1204177118264713, train acc: 49.0385


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))



train loss: 1.0725246285780883, train acc: 53.3654

train loss: 0.9887269127827425, train acc: 50.4808

train loss: 0.949872727577503, train acc: 50.0000

train loss: 0.9121195398844207, train acc: 51.9231

train loss: 0.8853872693501987, train acc: 53.3654

train loss: 0.8643282795181642, train acc: 52.8846

train loss: 0.8463760217030843, train acc: 51.9231

train loss: 0.8322952550191145, train acc: 57.2115

train loss: 0.8204017089797067, train acc: 55.2885

train loss: 0.8087945209863858, train acc: 62.0192

train loss: 0.7943512659806472, train acc: 66.3462

train loss: 0.7745996657963637, train acc: 76.9231

train loss: 0.7561487672802729, train acc: 83.1731

train loss: 0.7251674725602453, train acc: 89.9038

train loss: 0.6985537067257981, train acc: 91.3462

train loss: 0.669987006733815, train acc: 93.2692

train loss: 0.6403646057826065, train acc: 97.1154

train loss: 0.6117753831777148, train acc: 97.5962

train loss: 0.5850418515865019, train acc: 98.5577

train loss: 0

ResNet(
  (conv1): Conv1d(64, 64, kernel_size=(7,), stride=(2,), padding=(3,))
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
  )
  (layer2): Sequ

## 모델 테스트

In [15]:
# 베스트 모델 로드
model_ = torch.load(f'./result/BCI1_ResNet1d_New_Batch.pt') 

In [16]:
# 모델 테스트 함수 설정
def test(table, tag_name, name, timeformat, model, start_time_test, end_time_test, batch_size, time_df_test, pca, sample_rate):
    
            # Epoch 마다 testation을 진행해서 가장 좋은 성능을 보이는 모델을 저장 
            with torch.no_grad():
                
                model.eval()
                
                # 초기화
                preds_t = []
                targets_t = []
                    
                # 초기 시작 시간 설정
                start_time_t = start_time_test
                
                # 끝 시간 설정
                end_time_test = str(time_df_test.index[-1])
                
                # while 문을 통해 데이터 호출 
                while start_time_t < end_time_test:
                    
                    # 배치 크기에 따라 데이터 로드 
                    start_time_t, end_time_t, next_start_time_t, index_next_t = add_time(time_df_test, start_time_t, batch_size)
                    
                    # 데이터 로드 
                    data_t, label_t = data_load(table, tag_name, name, start_time_t, end_time_t, timeformat)
                    
                    # Hanning window 적용
                    data_t = data_t * hanning_window(sample_rate)
                    
                    # FFT 변환
                    data_t = change_fft(sample_rate, data_t)
                    
                    # PCA 적용
                    # 2차원으로 변환
                    data_ = data_t.reshape(-1, data_t.shape[2])
                    data_ = pca.fit_transform(data_)
                    # 다시 원래의 3차원 형태로 변환
                    data_t = data_.reshape(data_t.shape[0], data_t.shape[1], -1)

                    
                    # 로드한 데이터가 비어 있을 경우 출력 
                    if len(data_t) == 0:
                        print("데이터가 없습니다.")
                    
                    # 배치 사이즈 만큼 데이터가 쌓이면 다음 배치로 이동
                    if len(data_t) == batch_size:
                        
                        # 데이터를 numpy 배열로 변환
                        input_data_t = np.array(data_t)
                        input_target_t = np.array(label_t)[:-1]

                        # 데이터를 Tensor로 변환
                        input_data_t = torch.tensor(input_data_t, dtype=torch.float32).to(device).float()
                        input_target_t = torch.tensor(input_target_t, dtype=torch.float32).to(device).long().squeeze()
                        
                        # 모델 입력
                        outputs_t = model(input_data_t)
                        
                        # label 예측 값 설정 
                        _,pred_t = torch.max(outputs_t, dim=1)
                        target_t = input_target_t.view_as(pred_t)
        
                        preds_t.append(pred_t)
                        targets_t.append(target_t)
                        
                        # 배치 리셋
                        data_t = []
                        
                    # 다음 시작 시간 설정    
                    start_time_t = unquote(next_start_time_t)
                    
                    # 마지막 시간을 넘어 가져오는 것을 방지
                    if index_next_t + batch_size + 1 > len(time_df_test):
                        break
                        
                # 모든 배치에서 수집된 예측과 라벨을 합침
                preds_t = torch.cat(preds_t).detach().cpu().numpy()
                targets_t = torch.cat(targets_t).detach().cpu().numpy()
                    
            return targets_t, preds_t

In [19]:
######################################## 테스트 파라미터 설정 #############################################

# 테스트 데이터 시작 시간 설정
start_time_test = '2024-01-01 04:10:00'
# 테스트 데이터  끝 시간 설정
end_time_test = '2024-01-01 04:37:01'
# 배치 사이즈 설정
batch_size_test = 4

# 검증 시간 리스트 로드
time_df_test = time_data_load(table, name, quote(start_time_test), quote(end_time_test), timeformat)

######################################## 테스트 진행 #############################################
targets_t, preds_t = test(table, tag_name, name, timeformat, model_, start_time_test, end_time_test, batch_size_test, time_df_test, pca, sample_rate)

## 모델 성능 확인 

In [20]:
print(classification_report(targets_t, preds_t))

              precision    recall  f1-score   support

           0       1.00      0.64      0.78        14
           1       0.67      1.00      0.80        10

    accuracy                           0.79        24
   macro avg       0.83      0.82      0.79        24
weighted avg       0.86      0.79      0.79        24

