In [25]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, Dataset

from tqdm import tqdm
import math
import random
import os
import numpy as np

# Code introduction

## Tensor

In [26]:
x = torch.rand(3,256,256)

In [27]:
x.size()

torch.Size([3, 256, 256])

In [28]:
conv_1 = nn.Conv2d(3, 10, kernel_size= (3,3), padding = 1)
conv_1

Conv2d(3, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))

In [29]:
output = conv_1(x)

In [30]:
output.size()

torch.Size([10, 256, 256])

## Model

In [31]:
class Mymodel(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer_1 = nn.Linear(10,100)
    self.layer_2 = nn.Linear(100,50)
    self.layer_3 = nn.Linear(50,2)

  def forward(self, x):
    x = self.layer_1(x)
    x = self.layer_2(x)
    x = self.layer_3(x)
    return x

In [32]:
my_model = Mymodel()

In [33]:
input = torch.Tensor(50,10)

In [34]:
output = my_model(input)

In [35]:
output.size()

torch.Size([50, 2])

In [36]:
class Mymodel2(nn.Module): # 이미지 하나에 대한 코드 -> batch 사이즈로 병렬적으로 처리
  def __init__(self, input_size, middle_size, middle_size2, output_size):
    super().__init__()
    self.layer_1 = nn.Linear(input_size,middle_size)
    self.layer_2 = nn.Linear(middle_size,middle_size2)
    self.layer_3 = nn.Linear(middle_size2,output_size)

  def forward(self, x):
    x = self.layer_1(x)
    x = self.layer_2(x)
    x = self.layer_3(x)
    return x

In [37]:
my_model = Mymodel2(input_size = 10, middle_size = 50, middle_size2 = 50, output_size = 2)

In [38]:
input = torch.Tensor(50,10)

In [39]:
output = my_model(input)

In [40]:
output.size()

torch.Size([50, 2])

In [41]:
print(list(my_model.children())) # parameter 종류 출력할 수 있음

[Linear(in_features=10, out_features=50, bias=True), Linear(in_features=50, out_features=50, bias=True), Linear(in_features=50, out_features=2, bias=True)]


## trainer

In [42]:
class Trainer():
  def __init__(self, model, optimizer, loss_function):
    super().__init__()
    self.model = model
    self.optimizer = optimizer
    self.loss_function = loss_function

  def train(self, train_loader):
    self.model.train() # train 무조건 추가

    total_loss = 0
    total_correct = 0

    batch_size = train_loader.batch_size
    total_dataset_size = len(train_loader.dataset) # train_loader의 전체 데이터 수
    num_batches = math.ceil(total_dataset_size / batch_size) # 올림, ex) 595 -> batch_size 10이면 59.5인데 총 배치 수는 60개이므로 올림해준다.

    for batch in tqdm( train_loader, total = num_batches ): # tqdm(data, len) -> 전체적인 학습 시간을 확인할 수 있음 (꼭 사용하기 !!)
      self.optimizer.zero_grad()

      x_data_batch, y_data_batch = batch # Mydataset class에서 정의 함

      x_data_batch = x_data_batch.to(device) # GPU 사용 CUDA 관련 device
      y_data_batch = y_data_batch.to(device)

      outputs = self.model(x_data_batch)
      loss = self.loss_function(outputs, y_data_batch.long()) # 총 10개
      total_loss += loss.item() # dictionary 형태 ??

      #역전파 -> 가중치 업데이트 개념 (성능 update)
      loss.backward()
      self.optimizer.step()

      _, predicted = torch.max(outputs, 1)  # 확률이 가장 높은 클래스를 선택
      correct = (predicted == y_data_batch).sum().item()
      total_correct += correct

    avg_loss = total_loss / total_dataset_size
    accuracy = total_correct / total_dataset_size

    return avg_loss, accuracy # 1 epoch

  def valid(self, valid_loader):
    self.model.eval()  # 모델을 평가 모드로 전환

    total_loss = 0
    total_correct = 0

    batch_size = valid_loader.batch_size
    total_dataset_size = len(valid_loader.dataset)
    num_batches = math.ceil(total_dataset_size / batch_size)

    with torch.no_grad():  # 그래디언트 계산을 비활성화
        for batch in tqdm(valid_loader, total=num_batches):
            x_data_batch, y_data_batch = batch

            x_data_batch = x_data_batch.to(device)
            y_data_batch = y_data_batch.to(device)

            outputs = self.model(x_data_batch)
            loss = self.loss_function(outputs, y_data_batch.long())
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)  # 확률이 가장 높은 클래스를 선택
            correct = (predicted == y_data_batch).sum().item()
            total_correct += correct

    avg_loss = total_loss / total_dataset_size
    accuracy = total_correct / total_dataset_size

    return avg_loss, accuracy

  def test(self, test_loader):
    self.model.eval()  # 모델을 평가 모드로 전환

    total_loss = 0
    total_correct = 0

    batch_size = test_loader.batch_size
    total_dataset_size = len(test_loader.dataset)
    num_batches = math.ceil(total_dataset_size / batch_size)

    with torch.no_grad():  # 그래디언트 계산을 비활성화
        for batch in tqdm(test_loader, total=num_batches):
            x_data_batch, y_data_batch = batch

            x_data_batch = x_data_batch.to(device)
            y_data_batch = y_data_batch.to(device)

            outputs = self.model(x_data_batch)
            loss = self.loss_function(outputs, y_data_batch.long())
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)  # 확률이 가장 높은 클래스를 선택
            correct = (predicted == y_data_batch).sum().item()
            total_correct += correct

    avg_loss = total_loss / total_dataset_size
    accuracy = total_correct / total_dataset_size

    return avg_loss, accuracy

## Main

In [43]:
class Config(): # 하이퍼파라미터 그냥 설정
  EPOCH = 10
  LEARNING_RATE = 0.001
  BATCH_SIZE = 2
  SEED = 42

config = Config()

In [44]:
def seed_everything(seed): # seed 값 고정할 때 많이 용
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(config.SEED) # Seed 고정

input tensor를 정의하고 train, valid, test로 나누자 (보통 이미지 받아서 전처리하는 구간)

In [45]:
# 임의로 생성
x = torch.rand(10000,10)
y = torch.randint(0, 2, (10000, ), dtype=torch.float32)

In [55]:
print(x.size())
print(y.size())
print(x)
print(y)

torch.Size([10000, 10])
torch.Size([10000])
tensor([[0.8823, 0.9150, 0.3829,  ..., 0.7936, 0.9408, 0.1332],
        [0.9346, 0.5936, 0.8694,  ..., 0.5739, 0.2666, 0.6274],
        [0.2696, 0.4414, 0.2969,  ..., 0.1994, 0.5472, 0.0062],
        ...,
        [0.3525, 0.5646, 0.0209,  ..., 0.1258, 0.7432, 0.1262],
        [0.4578, 0.8054, 0.2267,  ..., 0.3260, 0.5572, 0.1310],
        [0.3460, 0.9053, 0.9056,  ..., 0.2654, 0.5969, 0.9301]])
tensor([0., 1., 1.,  ..., 1., 1., 0.])


In [47]:
# torch로 변환 및 x랑 y랑 정의하는 class (몇몇은 CustomDataset이라고 class명 짓기도 함)
# 데이터 출력에 대해서 사용자에 맞게 지정할 수 있음 3개 or 4개 등등  (BERT의 경우 attention mask, ind, label 등등 총 3개 )
class MyDataset(Dataset):
    def __init__(self, x_data, y_data):
        self.x_data = torch.tensor(x_data, dtype=torch.float32)
        self.y_data = torch.tensor(y_data, dtype=torch.float32)

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        return self.x_data[idx], self.y_data[idx]

In [48]:
mydataset = MyDataset(x, y)

  self.x_data = torch.tensor(x_data, dtype=torch.float32)
  self.y_data = torch.tensor(y_data, dtype=torch.float32)


In [49]:
# 전체 데이터셋 크기
total_size = len(mydataset)

# 훈련, 검증, 테스트 데이터셋의 크기 결정 (예: 60% 훈련, 20% 검증, 20% 테스트)
train_size = int(total_size * 0.6)
test_size = int(total_size * 0.2)
valid_size = total_size - train_size - test_size

# 데이터셋을 무작위로 분할 (random_split)
train_dataset, valid_dataset, test_dataset = random_split(mydataset, [train_size, valid_size, test_size])

# DataLoader 설정 (batch 사이즈로 쪼갠 batch를 train_loader에다가 넣기)
train_loader = DataLoader(train_dataset, batch_size = config.BATCH_SIZE, shuffle=False)
valid_loader = DataLoader(valid_dataset, batch_size = config.BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size = config.BATCH_SIZE, shuffle=False)

In [50]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

device(type='cpu')

In [51]:
# model, optimizer, loss_function 선언하기
model = Mymodel2(input_size = 10, middle_size = 50, middle_size2 = 50, output_size = 2)
optimizer = torch.optim.Adam(model.parameters(), lr=config.LEARNING_RATE) # parameter 명시해줘야 함, ++ 학습률도
loss_function = nn.CrossEntropyLoss()
trainer = Trainer(model.to(device), optimizer, loss_function)

In [52]:
# 돌려!
train_losses = []
valid_losses = []
train_accuracies = []
valid_accuracies = []

for epoch in range(config.EPOCH):
    print('=====================================================================================')
    print("epoch {}".format(epoch+1))

    train_loss, train_accuracy = trainer.train(train_loader)
    valid_loss, valid_accuracy = trainer.valid(valid_loader)

    # 결과값을 리스트에 추가
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    train_accuracies.append(train_accuracy)
    valid_accuracies.append(valid_accuracy)

    print("TRAIN LOSS = {}, TRAIN ACC = {}, ".format(train_loss, train_accuracy))
    print("VALID LOSS = {}, VALID ACC = {}, ".format(valid_loss, valid_accuracy))

epoch 1


100%|██████████| 3000/3000 [00:03<00:00, 787.01it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3814.39it/s]


TRAIN LOSS = 0.3480832233726978, TRAIN ACC = 0.4985, 
VALID LOSS = 0.34638086825609204, VALID ACC = 0.5165, 
epoch 2


100%|██████████| 3000/3000 [00:03<00:00, 812.70it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3756.07it/s]


TRAIN LOSS = 0.3470408114741246, TRAIN ACC = 0.5025, 
VALID LOSS = 0.34639873266220095, VALID ACC = 0.517, 
epoch 3


100%|██████████| 3000/3000 [00:04<00:00, 643.54it/s]
100%|██████████| 1000/1000 [00:00<00:00, 2742.17it/s]


TRAIN LOSS = 0.34689492818713186, TRAIN ACC = 0.5128333333333334, 
VALID LOSS = 0.3463370034992695, VALID ACC = 0.5175, 
epoch 4


100%|██████████| 3000/3000 [00:03<00:00, 755.57it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3846.28it/s]


TRAIN LOSS = 0.34684397032360237, TRAIN ACC = 0.5145, 
VALID LOSS = 0.34633379662036895, VALID ACC = 0.5205, 
epoch 5


100%|██████████| 3000/3000 [00:03<00:00, 817.93it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3936.09it/s]


TRAIN LOSS = 0.34675435167054336, TRAIN ACC = 0.5118333333333334, 
VALID LOSS = 0.34628407323360444, VALID ACC = 0.5205, 
epoch 6


100%|██████████| 3000/3000 [00:04<00:00, 619.36it/s]
100%|██████████| 1000/1000 [00:00<00:00, 2787.30it/s]


TRAIN LOSS = 0.34674477678040666, TRAIN ACC = 0.5153333333333333, 
VALID LOSS = 0.3463249849975109, VALID ACC = 0.5205, 
epoch 7


100%|██████████| 3000/3000 [00:04<00:00, 731.41it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3765.81it/s]


TRAIN LOSS = 0.34675034243861835, TRAIN ACC = 0.5203333333333333, 
VALID LOSS = 0.34629956862330435, VALID ACC = 0.5205, 
epoch 8


100%|██████████| 3000/3000 [00:03<00:00, 818.84it/s]
100%|██████████| 1000/1000 [00:00<00:00, 3753.90it/s]


TRAIN LOSS = 0.34672970602413017, TRAIN ACC = 0.5205, 
VALID LOSS = 0.3463122125267982, VALID ACC = 0.5205, 
epoch 9


100%|██████████| 3000/3000 [00:04<00:00, 744.91it/s]
100%|██████████| 1000/1000 [00:00<00:00, 2324.82it/s]


TRAIN LOSS = 0.3467184471487999, TRAIN ACC = 0.5181666666666667, 
VALID LOSS = 0.3462768086194992, VALID ACC = 0.5205, 
epoch 10


100%|██████████| 3000/3000 [00:04<00:00, 614.56it/s]
100%|██████████| 1000/1000 [00:00<00:00, 2191.64it/s]

TRAIN LOSS = 0.3467141010562579, TRAIN ACC = 0.5193333333333333, 
VALID LOSS = 0.34636564511060713, VALID ACC = 0.519, 





# 실제로 쓰이는 모델을 보자

In [53]:
class BatchedCNNLayer_Norm(nn.Module):

  def __init__(self, input_channel, output_channel):
    self.input_channel = input_channel
    self.output_channel = output_channel
    super().__init__()

    self.layer = nn.Sequential(
        nn.Conv2d(self.input_channel, self.output_channel, kernel_size=(3,3), padding=1),
        nn.ReLU(),
        nn.BatchNorm2d(self.output_channel),
        nn.Conv2d(self.output_channel, self.output_channel, kernel_size=(3,3), padding=1),
        nn.ReLU(),
        nn.BatchNorm2d(self.output_channel),
        nn.Conv2d(self.output_channel, self.output_channel, kernel_size=(3,3), stride=2, padding=1),
        nn.ReLU(),
        nn.BatchNorm2d(self.output_channel),
    )

  def forward(self, input_data):
    return self.layer(input_data)

class CNNModel(nn.Module):

  def __init__(self, num_classes):
    self.num_classes = num_classes
    super().__init__()


    self.cnn_encoder = nn.Sequential(
        BatchedCNNLayer_Norm(1, 32),
        BatchedCNNLayer_Norm(32, 64),
        BatchedCNNLayer_Norm(64, 128),
        BatchedCNNLayer_Norm(128, 256),
        BatchedCNNLayer_Norm(256, 512),
    )

    self.classifier = nn.Sequential(
        nn.Linear(512, 200),
        nn.ReLU(),
        nn.Linear(200, self.num_classes)
    )


  def forward(self, input_data):
    input_tensor = input_data.view(1, 1, input_data.size(-1), -1)
    encoder_out = self.cnn_encoder( input_tensor )
    logits = self.classifier(encoder_out.view(-1, 512))

    return logits