# Pytorch

Pytorch는 TensorFlow와 함께 Deep Learning에서 가장 널리 사용되는 framework입니다.

초기에는 Torch라는 이름으로 Lua 언어 기반으로 만들어졌으나, 이후 python 기반으로 변경한 것이 Pytorch입니다. 

New York 대학교와 Facebook이 공동으로 만들었고, Deep Learning 연구자들 사이에서는 가장 대중적으로 널리 사용되는 framework입니다.

## Pytorch Basic

### Pytorch import

In [2]:
import torch

print(torch.__version__)

1.12.0+cu116


### Pytorch 맛보기



In [3]:
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose
import matplotlib.pyplot as plt
import numpy as np

In [4]:
## MNIST Data down 받기

# 공개 데이터셋에서 학습 데이터를 내려받습니다.
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# 공개 데이터셋에서 테스트 데이터를 내려받습니다.
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [5]:
batch_size = 64

# 데이터로더를 생성합니다.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print("Shape of X [N, C, H, W]: ", X.shape)
    print("Shape of y: ", y.shape, y.dtype)
    break

Shape of X [N, C, H, W]:  torch.Size([64, 1, 28, 28])
Shape of y:  torch.Size([64]) torch.int64


In [6]:
# 학습에 사용할 CPU나 GPU 장치를 얻습니다.
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

# 모델을 정의합니다.
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Dropout(0.2),            
            nn.Linear(128, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

Using cuda device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=784, out_features=128, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.2, inplace=False)
    (3): Linear(in_features=128, out_features=10, bias=True)
  )
)


In [7]:
# Loss 함수와 Optimizer 설정
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [8]:
# Training을 위한 함수
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # 예측 오류 계산
        pred = model(X)
        loss = loss_fn(pred, y)

        # 역전파
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [9]:
# Test를 위한 함수
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 2.297552  [    0/60000]
loss: 0.415706  [ 6400/60000]
loss: 0.316079  [12800/60000]
loss: 0.429938  [19200/60000]
loss: 0.250197  [25600/60000]
loss: 0.418821  [32000/60000]
loss: 0.186435  [38400/60000]
loss: 0.353931  [44800/60000]
loss: 0.248667  [51200/60000]
loss: 0.334411  [57600/60000]
Test Error: 
 Accuracy: 93.9%, Avg loss: 0.198488 

Epoch 2
-------------------------------
loss: 0.129947  [    0/60000]
loss: 0.195041  [ 6400/60000]
loss: 0.101758  [12800/60000]
loss: 0.191361  [19200/60000]
loss: 0.173842  [25600/60000]
loss: 0.246342  [32000/60000]
loss: 0.075205  [38400/60000]
loss: 0.247550  [44800/60000]
loss: 0.185330  [51200/60000]
loss: 0.220050  [57600/60000]
Test Error: 
 Accuracy: 95.8%, Avg loss: 0.134612 

Epoch 3
-------------------------------
loss: 0.082979  [    0/60000]
loss: 0.118250  [ 6400/60000]
loss: 0.067619  [12800/60000]
loss: 0.098112  [19200/60000]
loss: 0.121437  [25600/60000]
loss: 0.176129  [32000/600

내가 쓴 손글씨로 Test 해봅시다.

Colab을 쓰는 경우에는 아래 cell을 실행하면 파일을 업로드할 수 있습니다.

그림판과 같은 도구를 이용하여 손으로 숫자를 쓴 다음 파일로 저장하고 업로드 합니다.

이 때 파일명은 image.png로 합니다.

In [10]:
import os
from PIL import Image
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

ModuleNotFoundError: No module named 'google.colab'

In [1]:
import os
from PIL import Image

# image file의 경로 설정
cur_dir = os.getcwd()
img_path = os.path.join(cur_dir, 'image.png')
# image file 읽기
cur_img = Image.open(img_path)
# 28x28로 resize
cur_img = cur_img.resize((28, 28))
image = np.asarray(cur_img)

# color image일 경우 RGB 평균값으로 gray scale로 변경
try:
  image = np.mean(image, axis=2)
except:
  pass
# upload한 image는 흰 배경에 검은 글씨로 되어 있으므로, MNIST data와 같이 검은 배경에 흰 글씨로 변경
image = np.abs(255-image)
# MNIST와 동일하게 data preprocessing(255로 나눠줌)
image = image.astype(np.float32)/255.
# 화면에 출력하여 확인
plt.imshow(image, cmap='gray')
plt.show()

NameError: name 'os' is not defined

In [None]:
image = torch.as_tensor(image).to(device).reshape(1,1,28,28)
model.eval()
predict = model(image)
print("Model이 예측한 값은 {} 입니다.".format(predict.argmax(1).item()))

### Tensor

텐서(tensor)는 배열(array)이나 행렬(matrix)과 매우 유사한 특수한 자료구조입니다. PyTorch에서는 텐서를 사용하여 모델의 입력(input)과 출력(output), 그리고 모델의 매개변수들을 부호화(encode)합니다.

In [None]:
# list로부터 직접 tensor 생성하기 
data = [[1, 2], [3, 4]]
x_data = torch.tensor(data)
print(x_data)

In [None]:
# numpy array로부터 tensor 생성하기
np_array = np.array(data)
x_np_1 = torch.tensor(np_array)
print(x_np_1)

In [None]:
x_np_2 = torch.as_tensor(np_array)
print(x_np_2)

In [None]:
x_np_3 = torch.from_numpy(np_array)
print(x_np_3)

In [None]:
x_np_1[0,0] = 5
print(x_np_1)
print(np_array)

In [None]:
x_np_2[0,0] = 6
print(x_np_2)
print(np_array)

In [None]:
x_np_3[0,0] = 7
print(x_np_3)
print(np_array)

In [None]:
np_again = x_np_1.numpy()
print(np_again, type(np_again))

In [None]:
a = torch.ones(2,3)
b = torch.zeros(2,3)
c = torch.full((2,3), 2)
d = torch.empty(2,3)
print(a)
print(b)
print(c)
print(d)

In [None]:
e = torch.zeros_like(c)
f = torch.ones_like(c)
g = torch.full_like(c, 3)
h = torch.empty_like(c)
print(e)
print(f)
print(g)
print(h)

In [None]:
i = torch.eye(3)
print(i)

In [None]:
j = torch.arange(10)
print(j)

In [None]:
k = torch.rand(2,2)
l = torch.randn(2,2)
print(k)
print(l)

#### Tensor의 속성

In [None]:
tensor = torch.rand(3,4)

print(f"Shape of tensor: {tensor.shape}")
print(f"Datatype of tensor: {tensor.dtype}")
print(f"Device tensor is stored on: {tensor.device}")

In [None]:
# 속성 변경
tensor = tensor.reshape(4,3)
tensor = tensor.int()
if torch.cuda.is_available():
  tensor = tensor.to('cuda')

print(f"Shape of tensor: {tensor.shape}")
print(f"Datatype of tensor: {tensor.dtype}")
print(f"Device tensor is stored on: {tensor.device}")

### Indexing과 Slicing

In [None]:
a = torch.arange(1, 13).reshape(3, 4)
print(a)

In [None]:
# indexing
print(a[1])
print(a[0,-1])

In [None]:
# slicing
print(a[1:-1])
print(a[:2, 2:])

### Transpose

In [None]:
a = torch.arange(16).reshape(2,2,4)
print(a, a.shape)

In [None]:
b = a.transpose(1, 2)
print(b, b.shape)

In [None]:
c = a.permute((2, 0, 1))
print(c, c.shape)

### Tensor 연산

In [None]:
x = torch.tensor([[1,2], [3,4]], dtype=torch.float32)
y = torch.tensor([[5,6], [7,8]], dtype=torch.float32)
print(x)
print(y)

In [None]:
print(x + y)
print(x - y)
print(x * y)
print(x / y)
print(x @ y)
print('='*30)
print(torch.add(x, y))
print(torch.subtract(x, y))
print(torch.multiply(x, y))
print(torch.divide(x, y))
print(torch.matmul(x, y))

In [None]:
# in-place 연산
print(x.add(y))
print(x)
print(x.add_(y))
print(x)

In [None]:
z = torch.arange(1, 11).reshape(2, 5)
print(z)

In [None]:
sum1 = torch.sum(z, axis=0)
sum2 = torch.sum(z, axis=1)
sum3 = torch.sum(z, axis=-1)
print(sum1, sum1.shape)
print(sum2, sum2.shape)
print(sum3, sum3.shape)

In [None]:
a = torch.arange(24).reshape(4, 6)
b = a.clone().detach()
print(a, a.shape)
print(b, b.shape)

In [None]:
c = torch.cat([a, b], axis=0)
print(c, c.shape)

In [None]:
c = torch.cat([a, b], axis=-1)
print(c, c.shape)

In [None]:
d = torch.stack([a, b], axis=0)
print(d, d.shape)

In [None]:
d = torch.stack([a, b], axis=-1)
print(d, d.shape)

## Dataset / Dataloader

Data를 처리하여 model에 공급하는 방법으로 Pytorch에서는 Dataset과 DataLoader를 제공합니다.

Dataset은 data와 label을 저장하고, DataLoader는 Dataset을 model에 공급할 수 있도록 iterable 객체로 감싸줍니다.

### FasionMNIST data 불러오기

In [None]:
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as tr

In [None]:
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

### 데이터 시각화하기

In [None]:
labels_map = {
    0: "T-Shirt",
    1: "Trouser",
    2: "Pullover",
    3: "Dress",
    4: "Coat",
    5: "Sandal",
    6: "Shirt",
    7: "Sneaker",
    8: "Bag",
    9: "Ankle Boot",
}
figure = plt.figure(figsize=(8, 8))
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(training_data), size=(1,)).item()
    img, label = training_data[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(labels_map[label])
    plt.axis("off")
    plt.imshow(img.squeeze(), cmap="gray")
plt.show()

### DataLoader 만들기

In [None]:
# DataLoader 만들기
train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=False)

In [None]:
# DataLoader를 통해 반복하기(iterate)
# 이미지와 정답(label)을 표시합니다.
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
img = train_features[0].squeeze()
label = train_labels[0]
plt.imshow(img, cmap="gray")
plt.show()
print(f"Label: {label}")

### Custom Dataset, Data Loader 만들기

In [None]:
# 간단한 Custom Dataset/Transform/DataLoader 만들기

class CustomDataset(Dataset):
  def __init__(self, np_data, transform=None):
    self.data = np_data
    self.transform = transform
    self.len = np_data.shape[0]
  def __len__(self):
    return self.len
  def __getitem__(self, idx):    
    sample = self.data[idx]
    if self.transform:
      sample = self.transform(sample)
    return sample

In [None]:
def square(sample):
  return sample**2

In [None]:
trans = tr.Compose([square])

In [None]:
np_data = np.arange(10)

custom_dataset = CustomDataset(np_data, transform=trans)

In [None]:
custom_dataloader = DataLoader(custom_dataset, batch_size=2, shuffle=True)

In [None]:
for _ in range(3):
  for data in custom_dataloader:
    print(data)
  print("="*20)

## Model

In [None]:
# device 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

### Model class 만들기

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Dropout(0.2),            
            nn.Linear(128, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [None]:
# Model instance 생성, device 설정
model = NeuralNetwork().to(device)
print(model)

In [None]:
# 가상의 data 만들어서 예측해보기
X = torch.rand(1, 28, 28, device=device)
logits = model(X)
pred_probab = nn.Softmax(dim=1)(logits)
y_pred = pred_probab.argmax(1)
print(f"Predicted class: {y_pred}")

## Training / Validation

#### Loss Function

In [None]:
# 손실 함수를 초기화합니다.
loss_fn = nn.CrossEntropyLoss()

### Optimizer

In [None]:
learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Training / Validation(Test) Function

In [None]:
# Training을 위한 함수
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # 예측(prediction)과 손실(loss) 계산
        pred = model(X)
        loss = loss_fn(pred, y)

        # 역전파
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

# Test를 위한 함수
def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# 학습 진행하기
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
print("Done!")

## Model 저장하고 불러오기

#### parameter만 저장하고 불러오기

In [None]:
# 학습된 model parameter 저장
torch.save(model.state_dict(), 'model_weights.pth')

In [None]:
# 새 Model instance 생성, device 설정
model2 = NeuralNetwork().to(device)
print(model2)

In [None]:
# test
model2.eval()
test_loop(test_dataloader, model2, loss_fn)

In [None]:
# 저장한 parameter 불러오기
model2.load_state_dict(torch.load('model_weights.pth'))

In [None]:
# test
model2.eval()
test_loop(test_dataloader, model2, loss_fn)

### Model 전체를 저장하고 불러오기

In [None]:
# 저장하기
torch.save(model, 'model.pth')

In [None]:
# 불러오기
model3 = torch.load('model.pth')

In [None]:
# test
model3.eval()
test_loop(test_dataloader, model2, loss_fn)

## Tensorboard 사용하여 시각화하기

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter('./logs/pytorch')

In [None]:
# 새 Model instance 생성, device 설정
model4 = NeuralNetwork().to(device)
print(model4)

model4.eval()
test_loop(test_dataloader, model4, loss_fn)

In [None]:
X = torch.rand(1, 28, 28, device=device)
writer.add_graph(model4, X)

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    total_loss = 0.
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # 예측(prediction)과 손실(loss) 계산
        pred = model(X)
        loss = loss_fn(pred, y)

        # 역전파
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        
        total_loss += loss / len(dataloader)
    return total_loss 

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return test_loss

In [None]:
parameters = ['Weight1', 'Bias1', 'Weight2', 'Bias2']

epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loss = train(train_dataloader, model, loss_fn, optimizer)
    writer.add_scalar('training loss', train_loss, t)
    for param, name in zip(model.parameters(), parameters):
        writer.add_histogram(name, param, t)
    test_loss = test(test_dataloader, model, loss_fn)
    writer.add_scalar('test_loss', test_loss, t)
print("Done!")

In [None]:
writer.close()

In [None]:
%tensorboard --logdir './logs/pytorch'