# RNN

## 감성분석

###  데이터셋 준비하기

In [1]:
# data 생성
# From torchText 0.9.0: torchtext.data.Field -> torchtext.legacy.data.Field
import torch
from torchtext.legacy import data, datasets

SEED = 5

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print("cpu와 cuda 중 다음 기기로 학습함:", DEVICE)

cpu와 cuda 중 다음 기기로 학습함: cpu


In [3]:
# set up fields
text = data.Field(lower=True, sequential=True, batch_first=True)
label = data.Field(sequential=False, batch_first=True)

# make splits for data
#train, test = datasets.IMDB.splits(text, label)
train_data, test_data = datasets.IMDB.splits(text, label)

In [4]:
# Dataset volume 확인

print('Number of training examples:', len(train_data))
print('Number of testing examples:', len(test_data))

Number of training examples: 25000
Number of testing examples: 25000


In [6]:
# 데이터 확인

print(vars(train_data.examples[0]))

{'text': ['bromwell', 'high', 'is', 'a', 'cartoon', 'comedy.', 'it', 'ran', 'at', 'the', 'same', 'time', 'as', 'some', 'other', 'programs', 'about', 'school', 'life,', 'such', 'as', '"teachers".', 'my', '35', 'years', 'in', 'the', 'teaching', 'profession', 'lead', 'me', 'to', 'believe', 'that', 'bromwell', "high's", 'satire', 'is', 'much', 'closer', 'to', 'reality', 'than', 'is', '"teachers".', 'the', 'scramble', 'to', 'survive', 'financially,', 'the', 'insightful', 'students', 'who', 'can', 'see', 'right', 'through', 'their', 'pathetic', "teachers'", 'pomp,', 'the', 'pettiness', 'of', 'the', 'whole', 'situation,', 'all', 'remind', 'me', 'of', 'the', 'schools', 'i', 'knew', 'and', 'their', 'students.', 'when', 'i', 'saw', 'the', 'episode', 'in', 'which', 'a', 'student', 'repeatedly', 'tried', 'to', 'burn', 'down', 'the', 'school,', 'i', 'immediately', 'recalled', '.........', 'at', '..........', 'high.', 'a', 'classic', 'line:', 'inspector:', "i'm", 'here', 'to', 'sack', 'one', 'of', '

In [7]:
# training set을 다시 train/dev set으로 분리
# default value는 7:3
import random

print("data type: ", train_data)
print("trn data volume", len(train_data))

train_data, valid_data = train_data.split(random_state = random.seed(SEED))#, split_ratio=0.9)
print("splited trn data volume: ", len(train_data))
print("splited dev data volume: ", len(valid_data))

data type:  <torchtext.legacy.datasets.imdb.IMDB object at 0xffff7976fd00>
trn data volume 25000
splited trn data volume:  17500
splited dev data volume:  7500


In [8]:
# 최종 데이터 볼륨 확인

print('Number of training examples: ',  len(train_data))
print('Number of validation examples:  ', len(valid_data))
print('Number of testing examples:  ', len(test_data))

Number of training examples:  17500
Number of validation examples:   7500
Number of testing examples:   25000


In [9]:
# 빈도를 고려하여 25,000 단어만 사용하기

MAX_VOCAB_SIZE = 25000

text.build_vocab(train_data, max_size = MAX_VOCAB_SIZE)
label.build_vocab(train_data)

In [10]:
# 단어,  label  수 확인하기
# <unk>, <pad> 포함

print('Unique tokens in TEXT vocabulary: ', len(text.vocab))
print('Unique tokens in LABEL vocabulary: ', len(label.vocab))

Unique tokens in TEXT vocabulary:  25002
Unique tokens in LABEL vocabulary:  3


In [11]:
# 등장 빈도가 높은 10개 단어 확인

print(text.vocab.freqs.most_common(10))

[('the', 225679), ('a', 112221), ('and', 111360), ('of', 101646), ('to', 93841), ('is', 72859), ('in', 63506), ('i', 49152), ('this', 48895), ('that', 46272)]


In [12]:
# string to int, int to string

print(text.vocab.itos[:10])
print(label.vocab.stoi)

['<unk>', '<pad>', 'the', 'a', 'and', 'of', 'to', 'is', 'in', 'i']
defaultdict(<bound method Vocab._default_unk_index of <torchtext.legacy.vocab.Vocab object at 0xffff3bf6c220>>, {'<unk>': 0, 'neg': 1, 'pos': 2})


In [13]:
# dataloader 생성하기

BATCH_SIZE = 64

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device)

print(device)

cpu


###  RNN 모델 생성하기

In [14]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        
        super().__init__()
        
        # num of vocab x embedding 차원
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        
        # RNN 선언
        self.rnn = nn.RNN(embedding_dim, hidden_dim)
        
        # 분류기
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, text):

        #  text = [sent len, batch size]
        embedded = self.embedding(text)
        
        #  embedded = [sent len, batch size, emb dim]
        out, hidden = self.rnn(embedded)
        
        # return self.fc(hidden.squeeze(0))
        return self.fc(out[-1, ...])

In [15]:
#  ...에 대해서 알아보기

torch.manual_seed(0)

a = torch.rand(3, 5, 5)
print(a.size())

b = a[1, ...]
print(b.size())

c = a[1, :, :]
print(c.size())

print(b==c)

torch.Size([3, 5, 5])
torch.Size([5, 5])
torch.Size([5, 5])
tensor([[True, True, True, True, True],
        [True, True, True, True, True],
        [True, True, True, True, True],
        [True, True, True, True, True],
        [True, True, True, True, True]])


In [16]:
input_dim = len(text.vocab)
embedding_dim = 64
hidden_dim = 128
output_dim = 1

model = RNN(input_dim, embedding_dim, hidden_dim, output_dim)

In [17]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print('The model has {:,} trainable parameters'.format(count_parameters(model)))

The model has 1,625,089 trainable parameters


### 모델 학습하기

In [18]:
# optimizer 지정

import torch.optim as optim

optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [19]:
# loss 함수 지정
criterion = nn.BCEWithLogitsLoss()

In [20]:
# device 지정
model = model.to(device)

In [21]:
# round, sigmoid 함수 확인

a = torch.tensor([-0.02, 0.4, -0.7, 0.9, 0.99])
b = torch.sigmoid(a)
print(b)

c = torch.round(b)
print(c)

tensor([0.4950, 0.5987, 0.3318, 0.7109, 0.7291])
tensor([0., 1., 0., 1., 1.])


In [22]:
# Accuracy 산출 함수

def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = rounded_preds.eq(y).float()
    acc = correct.sum() / len(y)
    return acc

In [29]:
#  학습 함수

def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    # 학습 지정
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        #text, label = batch.text.to(device), batch.label.to(device)
                
        predictions = model(batch.text).squeeze(1)
        
        loss = criterion(predictions, batch.label)
        
        acc = binary_accuracy(predictions, batch.label)

        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [24]:
#  평가 함수

def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.text).squeeze(1)
            
            loss = criterion(predictions, batch.label)
            
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [25]:
#  시간 측정 함수

import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [31]:
#  실제 학습 부분

N_EPOCHS = 5

best_valid_loss = float('inf')

for epoch in range(1, N_EPOCHS+1):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut1-model.pt')
    
    print('Epoch: {:02} | Epoch Time: {}m {}s'.format(epoch, epoch_mins, epoch_secs))
    print('\tTrain Loss: {:.3f} | Train Acc: {:.2f}'.format(train_loss, train_acc*100))
    print('\t Val. Loss: {:.3f} |  Val. Acc: {:.2f}'.format(valid_loss, valid_acc*100))

ValueError: Target size (torch.Size([64])) must be the same as input size (torch.Size([959]))

In [None]:
#  저장된 모델 load후 테스트 진행

model.load_state_dict(torch.load('tut1-model.pt'))

test_loss, test_acc = evaluate(model, test_iterator, criterion)

print('Test Loss: {:.3f} | Test Acc: {:.2f}'.format(test_loss, test_acc*100))

###  LSTM 모델 구성하기!

In [27]:
# 길이 정보를 포함하는 데이터 생성하기

import torch
from torchtext import data
from torchtext import datasets

SEED = 1234

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

text = data.Field(tokenize = 'spacy', include_lengths = True)
label = data.LabelField(dtype = torch.float)

AttributeError: module 'torchtext.data' has no attribute 'Field'

In [None]:
# 데이터셋 다운로드 및 분할

from torchtext import datasets

train_data, test_data = datasets.IMDB.splits(text, label)

In [None]:
# training set에서 validation set 분할하기
import random

train_data, valid_data = train_data.split(random_state = random.seed(SEED))

In [None]:
# 25,000 단어만 고려하기

max_vocab_size = 25000

text.build_vocab(train_data, max_size = max_vocab_size)

label.build_vocab(train_data)

In [None]:
BATCH_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    sort_within_batch = True,
    device = device)

In [None]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout, pad_idx):
        
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.rnn = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional
                           )
        
        self.fc1 = nn.Linear(hidden_dim * 2, 64)
        self.fc2 = nn.Linear(64, output_dim)
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        
        #  text = [sent len, batch size]
        
        embedded = self.embedding(text)
        
        #  embedded = [sent len, batch size, emb dim]
        
        #  pack sequence
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths)
        
        packed_output, (hidden, cell) = self.rnn(packed_embedded)
        
        #  unpack sequence
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)

        # output: (seq_length, batch size, hidden dim * num directions)
        
        output = self.relu(output)
        output = self.dropout(self.fc1(output[-1, ...]))
            
        return self.fc2(output)

In [None]:
# pack_padded_sequence, pad_packed_sequence 확인

from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

seq = torch.tensor([[1,2,0], [3,0,0], [4,5,6]])
lens = [2, 1, 3]

In [None]:
packed = pack_padded_sequence(seq, lens, batch_first=True, enforce_sorted=False)
print(packed)

In [None]:
seq_unpacked, lens_unpacked = pad_packed_sequence(packed, batch_first=True)
print(seq_unpacked)
print(lens_unpacked)

In [None]:
# model 선언

INPUT_DIM = len(text.vocab)
EMBEDDING_DIM = 64
HIDDEN_DIM = 128
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.5
PAD_IDX = text.vocab.stoi[text.pad_token]

model = RNN(INPUT_DIM, 
            EMBEDDING_DIM, 
            HIDDEN_DIM, 
            OUTPUT_DIM, 
            N_LAYERS, 
            BIDIRECTIONAL, 
            DROPOUT, 
            PAD_IDX)

In [None]:
# 모델 파라미터 수

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
import torch.optim as optim

optimizer = optim.Adam(model.parameters())

In [None]:
criterion = nn.BCEWithLogitsLoss()

model = model.to(device)
criterion = criterion.to(device)

In [None]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        text, text_lengths = batch.text
        
        predictions = model(text, text_lengths).squeeze(1)
        
        loss = criterion(predictions, batch.label)
        
        acc = binary_accuracy(predictions, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            text, text_lengths = batch.text

            predictions = model(text, text_lengths).squeeze(1)
            
            loss = criterion(predictions, batch.label)
            
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
N_EPOCHS = 5

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut2-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
model.load_state_dict(torch.load('tut2-model.pt'))

test_loss, test_acc = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

## RNN을 통한 이미지 분류

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dsets

In [None]:
# dataset 생성

train_dataset = dsets.MNIST(root='../data/RNN_AE', 
                            train=True, 
                            transform=transforms.ToTensor(),
                            download=True)

test_dataset = dsets.MNIST(root='../data/RNN_AE', 
                           train=False, 
                           transform=transforms.ToTensor(),
                           download=True)

In [None]:
# data loader

batch_size = 64

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset, 
                                          batch_size=batch_size, 
                                          shuffle=False)

In [None]:
# RNN model 생성

class RNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layer, output_dim):
        super(RNNModel, self).__init__()

        # Hidden dimensions
        self.hidden_dim = hidden_dim

        # Number of hidden layers
        self.num_layer = num_layer

        # RNN Cell 선언
        # batch_first=True > (batch_dim, seq_dim, input_dim)
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layer, batch_first=True)

        # Dense layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # 첫 번째 hidden state (h0), cell state (c0) 지정
        # (num_layer, batch_size, hidden_dim)
        h0 = torch.rand(self.num_layer, x.size(0), self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layer, x.size(0), self.hidden_dim).to(device)
        out, hn = self.rnn(x, (h0, c0))

        # out size
        # out.size() --> 64, 28, 10
        # out[:, -1, :] --> 64, 10 --> 최종 결과물만!
        out = self.fc(out[:, -1, :]) 
        # out.size() --> 64, 10
        return out

In [None]:
# model 상속

input_dim = 28
hidden_dim = 100
num_layer = 1
output_dim = 10

model = RNNModel(input_dim, hidden_dim, num_layer, output_dim)

In [None]:
device = ("cuda:0" if torch.cuda.is_available() else "cpu")

model.to(device)
print(device)

In [None]:
# optimizer 지정
learning_rate = 0.01

optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) 

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
# Training
num_epochs = 20

for epoch in range(1, num_epochs+1):
    for i, (images, labels) in enumerate(train_loader):
        model.train()
        # GPU사용을 위한 설정
        images, labels = images.squeeze(1).to(device), labels.to(device)

        # Clear gradients w.r.t. parameters
        optimizer.zero_grad()

        # Forward pass to get output/logits
        # outputs.size() --> 100, 10
        outputs = model(images)

        # loss 계산
        loss = criterion(outputs, labels)

        # gradients 계산
        loss.backward()

        # Updating parameters
        optimizer.step()

    model.eval()
    # Calculate Accuracy         
    correct = 0
    total = 0
    # Iterate through test dataset
    for images, labels in test_loader:
        # GPU사용을 위한 설정
        images, labels = images.squeeze(1).to(device), labels.to(device)

        # Forward pass only to get logits/output
        outputs = model(images)

        # Get predictions from the maximum value
        _, predicted = torch.max(outputs.data, 1)

        # 전체 데이터 수 구하기
        total += labels.size(0)

        # 정답 수 더하기
        correct += (predicted == labels).sum()

    accuracy = 100 * correct / total

    # Print Loss
    print('Epoch: {}. Loss: {:.3f}. Accuracy: {}'.format(epoch, loss.item(), accuracy))

# Auto Encoder

## Basic AE

In [None]:
import torch
import torchvision
from torch import nn
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# dataloader 생성

img_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

batch_size = 64

dataset = MNIST('../data/RNN_AE', download=True, transform=img_transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
# AE model 생성

class autoencoder(nn.Module):
    def __init__(self):
        super(autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True), 
            nn.Linear(64, 12), 
            nn.ReLU(True), 
            nn.Linear(12, 3)
            )

        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True),
            nn.Linear(128, 28 * 28),
            nn.Tanh()
            )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

model = autoencoder()
print(model)

In [None]:
# GPU 사용하기

device = ('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

model = model.to(device)

In [None]:
# optimizer, loss function 지정

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

In [None]:
# 시각화

%matplotlib inline
def show(origin, reconstructed):
    origin = origin.cpu().data
    origin = origin.view(origin.size(0), 28, 28)[0, ...]

    reconstructed = reconstructed.cpu().data
    reconstructed = reconstructed.view(reconstructed.size(0), 28, 28)[0, ...]

    plt.subplot(2, 2, 1)
    plt.imshow(origin)
    plt.title("Original")

    plt.subplot(2, 2, 2)
    plt.imshow(reconstructed)
    plt.title("Reconstructed")

    plt.show()

In [None]:
num_epochs = 10

for epoch in range(1, num_epochs+1):
    for (imgs, _) in dataloader:
        img = imgs.view(imgs.size(0), -1).to(device)

        # ===================forward=====================
        output = model(img)
        loss = criterion(output, img)
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # ===================log========================
    print('epoch [{}/{}], loss:{:.4f}'.format(epoch, num_epochs, loss.data))
    show(img, output)

## Convolutional Autoencoder

In [None]:


import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data.sampler import SubsetRandomSampler

from torch.utils.data import DataLoader
from torchvision import datasets, transforms



In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
# convert data to torch.FloatTensor
transform = transforms.ToTensor()

# load the training and test datasets
train_data = datasets.CIFAR10(root='data', train=True,
                                   download=True, transform=transform)
test_data = datasets.CIFAR10(root='data', train=False,
                                  download=True, transform=transform)

In [None]:
# Create training and test dataloaders

# batch size 지정
batch_size = 64

# 데이터로더 지정
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

# 시각화
def imshow(img):
    plt.imshow(np.transpose(img, (1, 2, 0)))  # convert from Tensor image
    
# label 명시
classes = ['airplane', 'automobile', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck']

In [None]:
# obtain one batch of training images
dataiter = iter(train_loader)
images, labels = dataiter.next()
print("Before: ", type(images))
images = images.numpy() # display를 위한 형변형
print("After: ", type(images))

# plot the images in the batch, along with the corresponding labels
fig = plt.figure(figsize=(25, 4))
# display 20 images
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])

In [None]:
import torch.nn as nn
import torch.nn.functional as F

# define the NN architecture
class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        ## encoder layers ##
        # conv layer (depth from 3 --> 16), 3x3 kernels
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)  
        # conv layer (depth from 16 --> 4), 3x3 kernels
        self.conv2 = nn.Conv2d(16, 4, 3, padding=1)
        # pooling layer (kerner size, stride)
        self.pool = nn.MaxPool2d(2, 2)
        
        ## decoder layers ##
        ## Upsampling
        self.t_conv1 = nn.ConvTranspose2d(4, 16, 2, stride=2)
        self.t_conv2 = nn.ConvTranspose2d(16, 3, 2, stride=2)

    def forward(self, x):
        ## encode ##
        # hidden layers with relu activation function
        # & maxpooling after
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        # add second hidden layer
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        
        ## decode ##
        # add transpose conv layers, with relu activation function
        x = F.relu(self.t_conv1(x))
        # output layer (with sigmoid for scaling from 0 to 1)
        x = F.sigmoid(self.t_conv2(x))
                
        return x

# initialize the NN
model = ConvAutoencoder()
print(model)

In [None]:
# loss function 지정
criterion = nn.BCELoss()

# optimizer 지정
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# GPU 사용하기

device = ('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

model = model.to(device)

In [None]:
# 전체 epoch 지정
n_epochs = 50

for epoch in range(1, n_epochs+1):
    train_loss = 0.0
    
    for (images, _) in train_loader:
        images = images.to(device)
        # gradients 초기화
        optimizer.zero_grad()

        # forward pass
        outputs = model(images)

        # calculate the loss
        loss = criterion(outputs, images)

        # backward pass: gradients 계산
        loss.backward()

        # parameter 업데이트
        optimizer.step()

        # training loss 합산
        train_loss += loss.item()*images.size(0)
            
    # 평균 loss 산출
    train_loss = train_loss/len(train_loader)
    print('Epoch: {} \tTraining Loss: {:.6f}'.format(
        epoch, 
        train_loss
        ))



In [None]:


# obtain one batch of test images
dataiter = iter(test_loader)
images, labels = dataiter.next()
images = images.to(device)

# get sample outputs
output = model(images)
# prep images for display
images = images.cpu().numpy()


# output is resized into a batch of iages
output = output.view(batch_size, 3, 32, 32)
# use detach when it's an output that requires_grad
output = output.cpu().detach().numpy()


# plot the first ten input images and then reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=10, sharex=True, sharey=True, figsize=(24,4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(output[idx])
    ax.set_title(classes[labels[idx]])
    
# plot the first ten input images and then reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=10, sharex=True, sharey=True, figsize=(24,4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])



In [None]:
import torchvision.transforms as transforms
from torchvision.utils import save_image
import os

In [None]:
# load the data
df = pd.read_pickle("../data/RNN_AE/LSWMD_sample.pkl")
print(df)

In [None]:
# data visualization
r_list = list(np.arange(100))

fig, ax = plt.subplots(nrows = 10, ncols = 10, figsize=(30, 30))
ax = ax.ravel(order='C')
tmp_num = 0
for i in r_list:
    img = df['waferMap'].values[i]
    ax[tmp_num].imshow(img, cmap = plt.cm.Reds)
    ax[tmp_num].set_xticks([])
    ax[tmp_num].set_yticks([])
    tmp_num += 1
plt.tight_layout()
plt.show()

In [None]:
# custom dataset for corolization
class customDataset(Dataset):
    def __init__(self, x):
        self.x = x
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((32, 32)),
            transforms.ToTensor(),
        ])

    def __len__(self):
        # dataset 길이 측정
        return len(self.x)

    def __getitem__(self, idx):
        # image resize > tensor 변형
        x_3d = np.repeat(self.x[idx][:, :, np.newaxis], 3, axis=2)

        colorized = np.where(
            x_3d==0, np.array([255, 255, 255], dtype=np.uint8),(
                np.where(
                    x_3d==1, np.array([255, 204, 204], dtype=np.uint8),(
                        np.array([51, 0, 0], dtype=np.uint8))))) ## 0/1/2 일때,
        x = self.transform(colorized)

        return x

In [None]:
# Set model
class Interpolate(nn.Module):
    def __init__(self, scale_factor, mode):
        super(Interpolate, self).__init__()
        self.interp = nn.functional.interpolate
        self.scale_factor = scale_factor
        self.mode = mode
        
    def forward(self, x):
        x = self.interp(x, scale_factor=self.scale_factor, mode=self.mode)
        return x


class CAE(nn.Module):
    def __init__(self):
        super(CAE, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, 3, 1, 1),
            nn.ReLU(),
            nn.Conv2d(16, 32, 4, 2, 1),
            nn.ReLU(),
            nn.Conv2d(32, 64, 4, 2, 1),
            nn.ReLU(),
        )

        self.decoder = nn.Sequential(
            # Interpolate(scale_factor=2, mode='nearest'),
            nn.Conv2d(64, 32, 3, 1, 1),
            nn.ReLU(),
            Interpolate(scale_factor=2, mode='nearest'),
            nn.Conv2d(32, 16, 3, 1, 1),
            nn.ReLU(),
            Interpolate(scale_factor=2, mode='nearest'),
            nn.Conv2d(16, 3, 1, 1, 0),
            nn.Sigmoid(),
        )

    def forward(self, x):
        encoded = self.encoder(x)  # encode.shape = torch.Size([BS, 64, 4, 4])
        decoded = self.decoder(encoded) # decode.shape = torch.Size([BS, 1, 32, 32])
        return encoded, decoded

In [None]:
batch_size = 64

# construct dataset
dataset = customDataset(df['waferMap'].values)

# dataset volumn 계산
num_train = int(0.7 * len(dataset))
num_dev = int(0.3 * len(dataset))

# split dataset
train_set, val_set = torch.utils.data.random_split(dataset, [num_train + 1, num_dev])

# construct dataloader
dataloader_trn = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)
dataloader_dev = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=2)


In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
autoencoder = CAE().to(device)
print(autoencoder)

print('Training Data 수 : {}'.format(len(train_set)))
print('Validation Data 수 : {}'.format(len(val_set)))
print('Batch Size : {}'.format(batch_size))
print('Train Mini Batch 개수 : {}'.format(len(dataloader_trn)))
print('Validation Mini Batch 개수 : {}'.format(len(dataloader_dev)))

In [None]:
# Define an optimizer and criterion
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)
best_loss = 1000
epochs = 30

In [None]:
for epoch in range(epochs):
    running_loss = 0.0
    _dev_loss = 0.0

    autoencoder.train()
    for i, data in enumerate(dataloader_trn):
        inputs = data.to(device)
        # ============ Forward ============
        _, outputs = autoencoder(inputs)
        loss = criterion(outputs, inputs)
        # ============ Backward ============
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # ============ Logging ============
        running_loss += loss.data
        if i % 10 == 9:
            print(f'[Trn] {epoch+1}/{epochs}, {i+1}/{len(dataloader_trn)} \
                loss: {(running_loss/20):.5f}')
            running_loss = 0.0

    # Validate Model
    autoencoder.eval()
    for idx, data in enumerate(dataloader_dev):
        # step progress
        inputs = data.to(device)

        with torch.no_grad():
            # ============ Forward ============
            _, outputs = autoencoder(inputs)
            loss = criterion(outputs, inputs)

            # ============ Logging ============
            _dev_loss += loss
            dev_loss = _dev_loss/(idx+1)
            if idx % 50 == 49:
                print(f'[Dev] {epoch+1}/{epochs}, {idx+1}/{len(dataloader_dev)} \
                    loss: {dev_loss:.5f}')

    if dev_loss < best_loss:
        best_loss = dev_loss
        # print(f"The best model is saved / Loss: {dev_loss:.5f}")
        torch.save({
            'model': autoencoder.state_dict(),
            'optimizer': optimizer.state_dict(),
            'trained_epoch': epoch,
        }, os.path.join('./best_model', 'autoencoder.pkl'))
        
        save_image(
            torchvision.utils.make_grid(outputs),
            os.path.join('./reconstructed', f'reconstructed_epoch_{epoch}.jpg'),
            normalize=True
            )
        save_image(
            torchvision.utils.make_grid(inputs),
            os.path.join('./reconstructed', f'original_epoch.jpg'),
            normalize=True
            )

print('Finished Training')