### import, device 설정

In [None]:
import torch
import torch.nn as nn  # 다양한 layer/모델 들이 정의된 패키지. (Neural Network)
from torch.utils.data import DataLoader # DataLoader 클래스 -> 모델에 데이터들을 제공하는 역할.
from torchvision import datasets, transforms 
# torchvision 패키지(라이브러리): pytorch의 영상 전용 sub package
## datasets(모듈): Vision(영상)관련 공개 데이터셋들을 제공하는 모듈
## transforms: 영상 데이터 전처리 기능들을 제공하는 모듈

import matplotlib.pyplot as plt # 시각화

In [None]:
import os
# 학습이 끝난 모델을 저장할 디렉토리.
model_dir = "models"
os.makedirs(model_dir, exist_ok=True)

# Dataset을 저장할 디렉토리
dataset_dir = "datasets/mnist"
os.makedirs(dataset_dir, exist_ok=True)

In [None]:
# 어느 Device에서 연산처리를 할지 지정. (cpu, cuda(GPU))
# device = "cpu"
print(torch.cuda.is_available())  # cuda를 사용할 수있는 환경인지 조회
device = "cuda" if torch.cuda.is_available() else "cpu" # 2.0이전: torch.Device("cuda")
device

### 하이퍼파라미터, 변수 설정

In [None]:
# batch size 256, learn rate 0.001, epcohs = 20으로 설정
batch_size = 256
lr = 0.001
epochs = 20

### MNIST dataset Loading

#### Dataset

In [None]:
######################################################################################
#  transform=함수 -> input data를 전처리하는 함수를 전달.
######################################################################################
# transforms.ToTensor 가 하는 처리
## ndarray, PIL.Image 객체를 torch.Tensor 로 변환.
## (height, width, channel) 순서를 channel first (channel, height, width) 형태로 변환.
## pixcel값들(0~255 정수)을 0 ~ 1 로 정규화. (Feature Scaling - MinMaxScaling)

trainset = datasets.MNIST(
    root=dataset_dir, 			# Dataset을 읽어올 디렉토리. 저장한 곳에서 읽어오면 됨
    download=True,		# root에 dataset이 없을 경우 다운로드 받을지 여부. T/F
	train=True,			# Trainset인지 여부. True(default): train set, False: test set	
    transform=transforms.ToTensor()
)
testset = datasets.MNIST(
    root=dataset_dir,
    download=True,
    train=False,      
    transform=transforms.ToTensor()
)

In [None]:
# 불러온 데이터 조회
trainset, testset

# 개별 데이터 조회 trainset[0], testset[0]
trainset[0], testset[0]


In [None]:
## 위 trainsform 부분 주석 해제하여 데이터 다시 확인
# 불러온 데이터 조회
trainset,testset

# 개별 데이터 조회 trainset[0], testset[0]
trainset[0], testset[0]


# shape, dtype/type(), min,max 확인 가능
t0 = trainset[0][0]
t0.shape, t0.dtype, t0.type(), t0.min(), t0.max()


#### DataLoader

In [None]:
# DataLoader: Dataset의 데이터들을 모델에 제공하는 역할. 데이터들을 모델에 어떻게 제공할지 설정해서 생성.
# Dataset: 데이터들을 가지고 있는 역할. 하나씩 조회하는 기능을 제공.
train_loader = DataLoader(
    trainset,              	# 어떤 Dataset을 제공할지
    batch_size=batch_size, 	# batch size (256) 처음에 정해준 변수로
    shuffle=True,   	# 모델에 데이터를 제공하기 전에 섞을지 여부. (default: False) True: 한 epoch 학습 전에 섞는다.
    drop_last=True, 	# 모델에 제공할 데이터의 개수가 batch_size보다 적으면 제공하지 않는다. (학습에 사용안함) T/F
)

test_loader = DataLoader(
	testset, 					# 어떤 Dataset을 제공할지
	batch_size=batch_size		# batch size(256) 처음에 정해준 변수로
)

In [None]:
# 전체 data 개수 확인 가능 len(데이터셋) train, test
len(trainset), len(testset)

In [None]:
# 1 epoch 당 step 수 조회, drop last 유무 차이
len(trainset) / batch_size, len(testset) / batch_size

In [None]:
len(train_loader), len(test_loader)

### 모델 정의

In [None]:
# subclass(상속) 방식
## - nn.Module 상속한 클래스를 정의.
## - __init__(): 입력값을 추론(순전파 연산)하는데 필요한 layer객체들을 생성.
## - forward(): __init__() 에서 생성한 layer들을 이용해 연산 로직을 정의

In [None]:
class MNISTModel(nn.Module):
    def __init__(self):
        super().__init__()	# 상위클래스 nn.Module 을 초기화해줘야함. super().__init__()
		
        
        # Linear(input feature 개수, output feature 개수)
        ## input/output feature 개수는 지켜줘야하며, 그 사이 layer 개수를 정하는건 우리의 몫.
        ### layer 4개로 하되 각 히든 layer의 출력 개수는 알아서
        self.lr1 = nn.Linear(784, 128) # (784:(28*28): MNIST 이미지의 pixcel수, 출력: 128)
        self.lr2 = nn.Linear(128, 64)
        self.lr3 = nn.Linear(64, 32)
        self.lr4 = nn.Linear(32, 10)   	# (32: lr3의 출력개수, 출력: 10)
        # 마지막 Linear()의 출력 개수(10) - 분류할 class개수(0 ~ 9)

    def forward(self, X):
        """
        X를 입력 받아서 y를 추론하는 계산로직을 정의
        initializer에서 정의한 Linear들을 이용해서 계산.
        Args:
            X(torch.FloatTensor) - 추론할 MNIST 이미지들. shape: (batch_size, 1, 28, 28)
        """
        # (batch_size, 1, 28, 28)를 (batch_size, 784) feature들을 1차원으로 변환.
        X = torch.flatten(X, start_dim=1)	# 다차원 배열을 1차원 배열로 변환. (start_dim=1, Flatten 시킬 시작 axis지정. 0축은 놔두고 1축 부터 flatten시킨다.)
        
        X = self.lr1(X)   	# Linear: 선형 함수
        X = nn.ReLU()(X)	# Activation(활성)함수. ReLU : nn.ReLU()
        
        X = self.lr2(X)
        X = nn.ReLU()(X)
        
        X = self.lr3(X)
        X = nn.ReLU()(X)
        
        output = self.lr4(X)
        return output

### Train

#### 모델, loss function, optimizer 생성

In [None]:
# 모델객체 생성
model = MNISTModel()
# 확인
print(model)

In [None]:
# Loss함수 정의
## 다중 분류: crossentropyloss -> nn.CrossEntropyLoss()
## 이진 분류: Binary CrossEntropy
## 회귀: mse
loss_fn = nn.CrossEntropyLoss()

In [None]:
# optimizer 정의 torch.optim.Adam(model.parameters(), lr)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

#### 학습(훈련-train) 및 검증

In [None]:
# 학습 시 계산에 사용되는 값들은 같은 device(cpu or cuda or mps)에 있어야 한다
## device로 이동할 대상: Model객체, X(input), y(output)

model = MNISTModel().to(device)

In [None]:
import time
# 학습
## 에폭별 검증결과들을 저장할 리스트, 시각화를 위함함
train_loss_list = []
valid_loss_list = []
valid_acc_list = []
s = time.time() # 학습 시간 측정을 위한.

# 학습-중첩 반복문: epoch 반복 -> step(batch_size) 에 대한 반복
# for epoch in range(epochs)
for epoch in range(epochs):
    ###############################################
    # 모델 Train - 1 epoch : Trainset
    ###############################################
    
    model.train()	# 모델을 train 모드로 변환 -> 모델명.train()
    train_loss = 0 # 현재 epoch의 train loss를 저장할 변수.

    # batch 단위로 학습: 1 step - 1개 batch의 데이터로 학습.
    for X_train, y_train in train_loader: # loader에서 data 가져오기
        # 1. data를 devcie로 이동
        X_train, y_train = X_train.to(device), y_train.to(device)
        # 2. 모델을 이용해 추론
        pred = model(X_train)	 # Model.forward(X_train) 메소드 호출
        # 3. 검증 -> loss 계산
        loss = loss_fn(pred, y_train)
        # 4. gradient 계산 
        loss.backward()
        # 5. 모델의 파라미터들(weight, bias) update
        optimizer.step()
        # 6. gradient 초기화
        optimizer.zero_grad
        # 학습 결과 저장및 출력을 위해 loss 저장.
        train_loss += loss.item() 

    
    train_loss = train_loss / len(train_loader)		# 한 에폭에서 학습한 step별 loss의 평균계산.
    train_loss_list.append(train_loss) 				# 리스트에 저장
    
    ###############################################
    # 1 epoch 학습한 결과 검증: Testset
    ###############################################
      # 모델을 evaluation() (추론, 검증) 모드로 변환 -> 모델명.eval()
    valid_loss = 0
    valid_acc = 0
    with torch.no_grad(): # 추론만 함 -> gradient 계산할 필요 없음. -> grad_fn 구할 필요없다.
        for X_valid, y_valid in test_loader:	# loader에서 data 가져오기
            # 1. data를 device로 이동
            X_valid, y_valid = X_valid.to(device), y_valid.to(device)
            # 2. 추론
            pred_valid = model(X_valid)
            # 3-1. 검증 -> loss 계산
            valid_loss += loss_fn(pred_valid, y_valid).item()
            # 3-2. 검증 -> accuracy 계산
            ## pred_valid shape: (256, 10: class별 확률) -> 정답 class 추출
            pred_valid_class = pred_valid.argmax(dim=-1)
            valid_acc += torch.sum(y_valid == pred_valid_class).item()
            
        
        # 검증결과 누적값의 평균
        valid_loss = valid_loss / len(test_loader)  # loss는 step수 나눔.
        valid_acc = valid_acc / len(testset)        # accuracy는 데이터 개수로 나눔.
        valid_loss_list.append(valid_loss) 			# list에 저장
        valid_acc_list.append(valid_acc)			# list에 저장

        # 검증 결과 출력
        print(f"[{epoch+1:02d}/{epochs}] train_loss: {train_loss}, valid_loss: {valid_loss}, valid_acc: {valid_acc}")

e = time.time()
print('학습에 걸린 시간(초):', e-s)

#### 학습 로그 시각화

In [None]:
# train loss, valid loss, valid acc 를 epoch 별로 어떻게 변하는지 시각화.
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(range(1, epochs+1), train_loss_list, label="train loss")
plt.plot(range(1, epochs+1), valid_loss_list, label="valid loss")
plt.title("Loss")
plt.legend()
plt.grid(True, linestyle=":")

plt.subplot(1, 2, 2)
plt.plot(range(1, epochs+1), valid_acc_list)
plt.title("valid accuracy")

plt.tight_layout()
plt.grid(True, linestyle=":")
plt.show()

### 모델 성능 최종 평가

In [None]:
model.eval() # 평가모드

test_loss = test_acc = 0
with torch.no_grad():
    for X_test, y_test in test_loader:
        # 1. device 이동
        X_test, y_test = X_test.to(device), y_test.to(device)
        # X_test = X_test.view(X_test.size(0), -1)  # (batch_size, 784)
        
		# 추론
        pred_test = model(X_test)
        # 검증 - loss
        loss_test = loss_fn(pred_test, y_test)
        test_loss += loss_test.item()
        # 검증 - accuracy
        ## class
        pred_test_class = pred_test.argmax(dim=-1)
        test_acc += torch.sum(pred_test_class == y_test).item()

    test_loss = test_loss / len(test_loader)
    test_acc = test_acc / len(testset)

In [None]:
# 결과 확인
test_loss, test_acc

## 새로운 데이터 추론

In [None]:
from PIL import Image

def load_data(device="cpu", *path):
    """
    받은 경로의 이미지들을 읽어서 Tensor로 변환해 반환한다.

    1. 전달받은 경로의 이미지 파일들을 읽는다.
    2. 28 x 28 로 resize
    3. torch.Tensor로 변환 + 전처리
    4. devcie로 이동시킨 뒤 반환한다다.
    """
    input_tensors = []
    for p in path:
        img = Image.open(p)		# 이미지 열어서
        # 전처리
        img = img.convert("L")	# grayscale로 변환 -> .convert("L")
        img = img.resize((28, 28))	# 모델이 학습한 데이터 size(28, 28)로 변환 -> .resize((28, 28))
        img = transforms.ToTensor()(img)
        input_tensors.append(img)
        
    return torch.stack(input_tensors).to(device)

In [None]:
def predict(model, inputs, device="cpu"):
    """
    받은 model에 inputs를 추론하여 그 결과 class들을 반환한다.
    """
    with torch.no_grad():
        model = model.to(device)
        model.eval()
        pred = model(inputs)
        pred_class = pred.argmax(dim=-1)
        return pred_class

In [None]:
# glob을 이용해 테스트 이미지들의 경로 조회.
from glob import glob
file_list = glob("test_img/**/*.png") # 경로 조회해서 저장
file_list

In [None]:
r = load_data(device, *file_list)
result_pred = predict(model, r)
result_pred

In [None]:
# 확인

plt.figure(figsize=(10, 5))
for idx, (path, label) in enumerate(zip(file_list, result_pred)):
    # print(idx, path, label, sep=" , ")
    img = Image.open(path).convert('L')
    plt.subplot(3, 5, idx+1)  #3, 4
    plt.imshow(img, cmap="gray")
    plt.title(f"예측결과: {label}") # Label
    plt.axis("off")

plt.tight_layout()
plt.show()