# <br>[ LG전자_DX_Intensive_Course  ] 딥러닝 기반 시계열 분석 3<br><br>: CNN 주요 모델 2 & CAM/Grad-CAM - CNN for Time Series Data<br>

In [None]:
# github에서 데이터 불러오기
!git clone https://github.com/KU-DIC/LG_time_series_day10.git

In [1]:
# 모듈 불러오기
import os
import time
import copy
import random
import pickle
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

# <br>0. Hyperparameter Setting
- data_dir: 데이터가 존재하는 경로 (해당 실습에서는 train/test 시계열 데이터가 존재하는 경로를 의미함)
- batch_size: 학습 및 검증에 사용할 배치의 크기
- num_classes: 새로운 데이터의 class 개수
- num_epochs: 학습할 epoch 횟수
- window_size: input의 시간 길이 (time series data에서 도출한 subsequence의 길이)
- random_seed: reproduction을 위해 고정할 seed의 값

In [2]:
# Hyperparameter setting
data_dir = '/content/LG_time_series_day10/input/har-data'
batch_size = 32
num_classes = 6
num_epochs = 50
window_size = 50

random_seed = 42
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Detect if we have a GPU available

In [3]:
# seed 고정
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

---

# <br>__1. Data: Human Activity Recognition Data__
- 데이터 description
    - Human Activity Recognition (HAR) Data는 30명의 실험자들이 각자 스마트폰을 허리에 착용하고 6가지 활동 (Walking, Walking Upstairs, Walking Downstairs, Sitting, Standing, Laying)을 수행할 때 측정된 센서 데이터로 구성된 데이터셋이다. 해당 데이터셋은 총 561개의 변수로 이루어져 있으며, 전체 데이터 중 70%는 train 데이터이고 나머지 30%는 test 데이터이다. HAR Data를 활용한 시계열 분류 task는 다변량 시계열 데이터를 input으로 받아 이를 다음 6가지 활동 중 하나의 class로 분류하는 것을 목표로 한다: 0(Walking), 1(Walking Upstairs), 2(Walking Downstairs), 3(Sitting), 4(Standing), 5(Laying). <br><br>

- 변수 설명
    - 독립변수(X): 여러 실험자에 대하여 561개의 변수를 281 시점동안 수집한 시계열 데이터 -> shape: (#실험자, 561, 281)
    - 종속변수(Y): 시계열 데이터의 label - 0(Walking) / 1(Walking Upstairs) / 2(Walking Downstairs) / 3(Sitting) / 4(Standing) / 5(Laying) <br><br>

- 데이터 출처
    - https://archive.ics.uci.edu/ml/datasets/human+activity+recognition+using+smartphones#

In [4]:
def create_classification_dataset(window_size, data_dir, batch_size):
    # data_dir에 있는 train/test 데이터 불러오기
    x = pickle.load(open(os.path.join(data_dir, 'x_train.pkl'), 'rb'))
    y = pickle.load(open(os.path.join(data_dir, 'state_train.pkl'), 'rb'))
    x_test = pickle.load(open(os.path.join(data_dir, 'x_test.pkl'), 'rb'))
    y_test = pickle.load(open(os.path.join(data_dir, 'state_test.pkl'), 'rb'))

    # train data를 시간순으로 8:2의 비율로 train/validation set으로 분할
    # train, validation, test data의 개수 설정
    n_train = int(0.8 * len(x))
    n_valid = len(x) - n_train
    n_test = len(x_test)
    # train/validation set의 개수에 맞게 데이터 분할
    x_train, y_train = x[:n_train], y[:n_train]
    x_valid, y_valid = x[n_train:], y[n_train:]

    # train/validation/test 데이터를 window_size 시점 길이로 분할
    datasets = []
    for set in [(x_train, y_train, n_train), (x_valid, y_valid, n_valid), (x_test, y_test, n_test)]:
        # 전체 시간 길이 설정
        T = set[0].shape[-1]
        # 전체 X 데이터를 window_size 크기의 time window로 분할
        windows = np.split(set[0][:, :, :window_size * (T // window_size)], (T // window_size), -1)
        windows = np.concatenate(windows, 0)
        # 전체 y 데이터를 window_size 크기에 맞게 분할
        labels = np.split(set[1][:, :window_size * (T // window_size)], (T // window_size), -1)
        labels = np.round(np.mean(np.concatenate(labels, 0), -1))
        # 분할된 time window 단위의 X, y 데이터를 tensor 형태로 축적
        datasets.append(torch.utils.data.TensorDataset(torch.Tensor(windows), torch.Tensor(labels)))

    # train/validation/test DataLoader 구축
    trainset, validset, testset = datasets[0], datasets[1], datasets[2]
    train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
    valid_loader = torch.utils.data.DataLoader(validset, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)

    return train_loader, valid_loader, test_loader

In [5]:
# Dataloader 구축
# data shape: (batch_size x input_size x seq_len)
train_loader, valid_loader, test_loader = create_classification_dataset(window_size, data_dir, batch_size)

---

# <br>__2. Model: 1D CNN__

- 1-dimensional convolution layer 설명 - **torch.nn.Conv1d()**
    - in_channels: input image의 채널 개수 (시계열 데이터에서는 변수 개수)
    - out_channels: 1-dimensional convolution의 output의 채널 개수 (kernel 개수)
    - kernel_size: 1-dimensional convolution의 kernel 크기 (한 kernel에서 고려할 시점의 길이)

In [6]:
# 1-dimensional convolution layer로 구성된 CNN 모델
# 2개의 1-dimensional convolution layer와 1개의 fully-connected layer로 구성되어 있음
class CNN_1D(nn.Module):
    def __init__(self, num_classes):
        super(CNN_1D, self).__init__()
        # 첫 번째 1-dimensional convolution layer 구축
        self.layer1 = nn.Sequential(
            nn.Conv1d(561, 64, kernel_size=3),
            nn.ReLU(),
            nn.AvgPool1d(2)
        )
        # 두 번째 1-dimensional convolution layer 구축
        self.layer2 = nn.Sequential(
            nn.Conv1d(64, 64, kernel_size=3),
            nn.ReLU(),
            nn.AvgPool1d(2)
        )
        # fully-connected layer 구축
        self.fc = nn.Linear(64 * 11, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [7]:
# 1D CNN 구축
model = CNN_1D(num_classes=num_classes)
model = model.to(device)
print(model)

CNN_1D(
  (layer1): Sequential(
    (0): Conv1d(561, 64, kernel_size=(3,), stride=(1,))
    (1): ReLU()
    (2): AvgPool1d(kernel_size=(2,), stride=(2,), padding=(0,))
  )
  (layer2): Sequential(
    (0): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
    (1): ReLU()
    (2): AvgPool1d(kernel_size=(2,), stride=(2,), padding=(0,))
  )
  (fc): Linear(in_features=704, out_features=6, bias=True)
)


---

# <br>__3. Training__

In [8]:
# SGD optimizer 구축하기
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [9]:
def train_model(model, dataloaders, criterion, num_epochs, optimizer):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch + 1, num_epochs))
        print('-' * 10)

        # 각 epoch마다 순서대로 training과 validation을 진행
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 모델을 training mode로 설정
            else:
                model.eval()   # 모델을 validation mode로 설정

            running_loss = 0.0
            running_corrects = 0
            running_total = 0

            # training과 validation 단계에 맞는 dataloader에 대하여 학습/검증 진행
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device, dtype=torch.long)

                # parameter gradients를 0으로 설정
                optimizer.zero_grad()

                # forward
                # training 단계에서만 gradient 업데이트 수행
                with torch.set_grad_enabled(phase == 'train'):
                    # input을 model에 넣어 output을 도출한 후, loss를 계산함
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    # output 중 최댓값의 위치에 해당하는 class로 예측을 수행
                    _, preds = torch.max(outputs, 1)

                    # backward (optimize): training 단계에서만 수행
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # batch별 loss를 축적함
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                running_total += labels.size(0)

            # epoch의 loss 및 accuracy 도출
            epoch_loss = running_loss / running_total
            epoch_acc = running_corrects.double() / running_total

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # validation 단계에서 validation loss가 감소할 때마다 best model 가중치를 업데이트함
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    # 전체 학습 시간 계산
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # validation loss가 가장 낮았을 때의 best model 가중치를 불러와 best model을 구축함
    model.load_state_dict(best_model_wts)
    
    # best model 가중치 저장
    # torch.save(best_model_wts, '../output/best_model.pt')
    return model, val_acc_history

In [10]:
# trining 단계에서 사용할 Dataloader dictionary 생성
dataloaders_dict = {
    'train': train_loader,
    'val': valid_loader
}

In [11]:
# loss function 설정
criterion = nn.CrossEntropyLoss()

In [12]:
# 모델 학습
model, val_acc_history = train_model(model, dataloaders_dict, criterion, num_epochs, optimizer)

Epoch 1/50
----------
train Loss: 1.7940 Acc: 0.0500
val Loss: 1.7857 Acc: 0.3200

Epoch 2/50
----------
train Loss: 1.7794 Acc: 0.3250
val Loss: 1.7625 Acc: 0.5600

Epoch 3/50
----------
train Loss: 1.7528 Acc: 0.4125
val Loss: 1.7210 Acc: 0.5200

Epoch 4/50
----------
train Loss: 1.7123 Acc: 0.4125
val Loss: 1.6561 Acc: 0.5200

Epoch 5/50
----------
train Loss: 1.6480 Acc: 0.4125
val Loss: 1.5660 Acc: 0.5200

Epoch 6/50
----------
train Loss: 1.5564 Acc: 0.4125
val Loss: 1.4425 Acc: 0.5200

Epoch 7/50
----------
train Loss: 1.4384 Acc: 0.4125
val Loss: 1.3250 Acc: 0.5200

Epoch 8/50
----------
train Loss: 1.3473 Acc: 0.4125
val Loss: 1.2787 Acc: 0.5200

Epoch 9/50
----------
train Loss: 1.3025 Acc: 0.4125
val Loss: 1.2907 Acc: 0.5200

Epoch 10/50
----------
train Loss: 1.2499 Acc: 0.4125
val Loss: 1.3132 Acc: 0.5200

Epoch 11/50
----------
train Loss: 1.2029 Acc: 0.4125
val Loss: 1.3516 Acc: 0.5600

Epoch 12/50
----------
train Loss: 1.1837 Acc: 0.5125
val Loss: 1.3290 Acc: 0.5600

E

---

# <br>__4. Testing__

In [13]:
def test_model(model, test_loader):
    model.eval()   # 모델을 validation mode로 설정
    
    # test_loader에 대하여 검증 진행 (gradient update 방지)
    with torch.no_grad():
        corrects = 0
        total = 0
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device, dtype=torch.long)

            # forward
            # input을 model에 넣어 output을 도출
            outputs = model(inputs)

            # output 중 최댓값의 위치에 해당하는 class로 예측을 수행
            _, preds = torch.max(outputs, 1)

            # batch별 정답 개수를 축적함
            corrects += torch.sum(preds == labels.data)
            total += labels.size(0)

    # accuracy를 도출함
    test_acc = corrects.double() / total
    print('Testing Acc: {:.4f}'.format(test_acc))

In [14]:
# 모델 검증 (Acc: 0.8222)
test_model(model, test_loader)

Testing Acc: 0.8222


---