In [1]:
import sys
sys.path.append('/home2/jupyter-s.kim/.local/lib/python3.7/site-packages/') # jupyter와 같은 package를 사용하기 위한 경로

import library

In [2]:
### default
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import random
import time
import copy

### modeling
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dense, Flatten, Dropout
from tensorflow.keras.utils import to_categorical

### visualization
import seaborn as sns
from matplotlib import pyplot as plt

In [3]:
# 센서 종류
sensors = ['C7H8', 'TMA', 'VOC', 'CO2', 'HCHO', 'H2S', 'NH3', 'CH3SH', 'SO2', 'NO2', 'CO']
cols = ['C7H8', 'TMA', 'VOC', 'CO2', 'HCHO', 'H2S', 'NH3', 'CH3SH', 'SO2', 'NO2', 'CO', 'reg_date', 'label_type']

In [4]:
df = pd.read_csv('df_type_total_230403.csv') # 3,4차 데이터셋

In [5]:
df = df[cols]

함수정의

In [None]:
# 데이터셋을 만들기 위한 실험 구분
def create_experiment_number(data):
    data['lagged'] = data['reg_date'].shift(-1)
    data['reg_date'] = pd.to_datetime(data['reg_date'])
    data['lagged'] = pd.to_datetime(data['lagged'])
    data['diff'] = data['lagged'] - data['reg_date']
    data['diff'] = data['diff'] /  pd.Timedelta('1s')
    data['diff'].fillna(1,inplace=True) #마지막줄 채워주기 위함
    data.reset_index(drop=True, inplace=True)

    num = 0
    for i in range(len(data)):
        if data.loc[i,'diff'] in [1,2]:
            data.loc[i,'exp_num'] = num
        else:
            data.loc[i,'exp_num'] = num
            num+=1
    return data

# 실험 별로 시계열 데이터셋을 만들기 위한 함수
def create_windows(data, window_size=5):
    X = []
    y = []
    for num, group_df in data.groupby('exp_num'):
        data = group_df.iloc[:, 0:11].values
        labels = group_df['label_type'].values
        for i in range(len(data) - window_size + 1):
            X.append(data[i:i+window_size])
            y.append(labels[window_size-1])
    return np.array(X), np.array(y)

# 데이터셋을 train/valid/test로 나누는 함수
def split_data(X, y, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    assert train_ratio + val_ratio + test_ratio == 1, "The sum of the ratios must be equal to 1."
    assert len(X) == len(y), "The length of X and y must be the same."

    # numpy 배열을 torch 텐서로 변환합니다.
    X = torch.from_numpy(X)
    y = torch.from_numpy(y)

    # 데이터를 무작위로 섞기 위한 인덱스를 생성합니다.
    shuffled_indices = torch.randperm(len(X))

    # 비율에 따라 인덱스를 설정합니다.
    train_cnt = int(len(X) * train_ratio)
    val_cnt = int(len(X) * val_ratio)
    test_cnt = len(X) - (train_cnt + val_cnt)

    # 데이터를 train, validation, test 셋으로 나눕니다 (torch.index_select 사용).
    X_train, X_val, X_test = torch.index_select(X, dim=0, index=shuffled_indices).split([train_cnt, val_cnt, test_cnt], dim=0)
    y_train, y_val, y_test = torch.index_select(y, dim=0, index=shuffled_indices).split([train_cnt, val_cnt, test_cnt], dim=0)
    
    print('---'*30)
    print('Splitting Complete')
    print(X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape)
    return X_train, y_train, X_val, y_val, X_test, y_test

# torch dataloader를 만들기 위한 함수
def create_classification_dataset(data, window_size, batch_size):

    X, y = create_windows(data, window_size)

    encoder = LabelEncoder()
    y = encoder.fit_transform(y)
    y = to_categorical(y, num_classes=len(np.unique(y)))

    X = X.transpose(0,2,1)
    X = X.astype(np.float32)
    y = y.astype(np.float32)
    
    X_train, y_train, X_val, y_val, X_test, y_test = split_data(X, y)

    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)
    test_dataset = TensorDataset(X_test, y_test)

    # DataLoader 생성
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

데이터 전처리

In [None]:
df = create_experiment_number(df)

tensorflow를 사용한 1D-CNN 구현

In [None]:
window_size=5
X, y= create_windows(df, window_size)

In [None]:
X.shape, y.shape

In [None]:
X = X.transpose(0,2,1)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_test = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

encoder = LabelEncoder()
y_train = encoder.fit_transform(y_train)
y_test = encoder.transform(y_test)

y_train = to_categorical(y_train, num_classes=len(np.unique(y)))
y_test = to_categorical(y_test, num_classes=len(np.unique(y)))

In [None]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

In [None]:
X_train.shape[-2]

In [None]:
model = Sequential()
model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(X_train.shape[-2], window_size)))
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(50, activation='relu'))
model.add(Dense(len(np.unique(y)), activation='softmax'))

In [None]:
model.summary()

In [None]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Step 5: Train and evaluate the model
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test), verbose=2)

In [None]:
def plot_metrics(history):
    # Plot training & validation accuracy values
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='best')

    # Plot training & validation loss values
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='best')

    plt.show()

plot_metrics(history)

In [None]:
max(history.history['accuracy'])

pytorch를 사용한 1D-CNN 구현

In [None]:
# Hyperparameter setting
batch_size = 32
num_classes = len(np.unique(df['label_type']))
num_epochs = 100
window_size = 5

random_seed = 42
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Detect if we have a GPU available

In [None]:
# seed 고정
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

In [None]:
# Dataloader 구축
# data shape: (batch_size x input_size x seq_len) => (32, 11, 5)
train_loader, valid_loader, test_loader = create_classification_dataset(df, window_size, batch_size)

In [None]:
# DataLoader에서 데이터 형태 확인
for batch_idx, (inputs, targets) in enumerate(train_loader):
    print(f"Batch {batch_idx + 1} - Inputs shape: {inputs.shape}, Targets shape: {targets.shape}")
    break

In [None]:
# 1-dimensional convolution layer로 구성된 CNN 모델
# 2개의 1-dimensional convolution layer와 1개의 fully-connected layer로 구성되어 있음
class CNN_1D(nn.Module):
    def __init__(self, in_channel, out_channel, num_classes):

        self.in_channel = in_channel
        self.out_channel = out_channel
        self.num_classes = num_classes

        super(CNN_1D, self).__init__()
        # 첫 번째 1-dimensional convolution layer 구축
        self.layer1 = nn.Sequential(
            nn.Conv1d(in_channel, out_channel, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size=2, stride=2)
            # nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 두 번째 1-dimensional convolution layer 구축
        self.layer2 = nn.Sequential(
            nn.Conv1d(out_channel, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size=2, stride=2)
            # nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # fully-connected layer 구축
        self.fc = nn.Linear(32*1, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
# 1D CNN 구축
model = CNN_1D(in_channel=X_train.shape[-2], out_channel=16, num_classes=num_classes) # in_channel은 feature의 갯수
model = model.to(device)
print(model)

In [None]:
# SGD optimizer 구축하기
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
def train_model(model, dataloaders, criterion, num_epochs, optimizer):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch + 1, num_epochs))
        print('-' * 10)

        # 각 epoch마다 순서대로 training과 validation을 진행
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 모델을 training mode로 설정
            else:
                model.eval()   # 모델을 validation mode로 설정

            running_loss = 0.0
            running_corrects = 0
            running_total = 0

            # training과 validation 단계에 맞는 dataloader에 대하여 학습/검증 진행
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # parameter gradients를 0으로 설정
                optimizer.zero_grad()

                # forward
                # training 단계에서만 gradient 업데이트 수행
                with torch.set_grad_enabled(phase == 'train'):
                    # input을 model에 넣어 output을 도출한 후, loss를 계산함
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    # output 중 최댓값의 위치에 해당하는 class로 예측을 수행
                    _, preds = torch.max(outputs, 1)

                    # backward (optimize): training 단계에서만 수행
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # batch별 loss를 축적함
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == torch.argmax(labels, dim=-1))
                running_total += labels.size(0)

            # epoch의 loss 및 accuracy 도출
            epoch_loss = running_loss / running_total
            epoch_acc = running_corrects.double() / running_total

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # validation 단계에서 validation loss가 감소할 때마다 best model 가중치를 업데이트함
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    # 전체 학습 시간 계산
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # validation loss가 가장 낮았을 때의 best model 가중치를 불러와 best model을 구축함
    model.load_state_dict(best_model_wts)
    
    # best model 가중치 저장
    # torch.save(best_model_wts, '../output/best_model.pt')
    return model, val_acc_history

In [None]:
# trining 단계에서 사용할 Dataloader dictionary 생성
dataloaders_dict = {
    'train': train_loader,
    'val': valid_loader
}
# loss function 설정
criterion = nn.CrossEntropyLoss()

In [None]:
# 모델 학습
model, val_acc_history = train_model(model, dataloaders_dict, criterion, num_epochs, optimizer)

In [None]:
def test_model(model, test_loader):
    model.eval()   # 모델을 validation mode로 설정
    
    # test_loader에 대하여 검증 진행 (gradient update 방지)
    with torch.no_grad():
        corrects = 0
        total = 0
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # forward
            # input을 model에 넣어 output을 도출
            outputs = model(inputs)

            # output 중 최댓값의 위치에 해당하는 class로 예측을 수행
            _, preds = torch.max(outputs, 1)

            # batch별 정답 개수를 축적함
            corrects += torch.sum(preds == torch.argmax(labels, dim=-1))
            total += labels.size(0)

    # accuracy를 도출함
    test_acc = corrects.double() / total
    print('Testing Acc: {:.4f}'.format(test_acc))

In [None]:
# 모델 검증 (Acc: 0.8000)
test_model(model, test_loader)

In [None]:
val_acc_history
