## VGGNET 모델 파이토치 구현
### 더 3x3 conv 필터를 사용하여 더 적은 파라미터로 같은 receptive field를 갖음 - > 자세한 내용은 논문
### 논문 링크: https://arxiv.org/abs/1409.1556

## 데이터셋 다운로드 훈련용 50만장, 검증용 10만장

In [None]:
!pip install natsort opencv-python tqdm

In [None]:
import os
import shutil
import torchvision
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch
import cv2
import torchvision.transforms as T
import natsort
from tqdm import tqdm


In [None]:
# 대규모의 데이터로 학습 하고 싶으면 주석해제

# torchvision.datasets.INaturalist(root='./data',version='2021_train_mini', download=True)
# torchvision.datasets.INaturalist(root='./data',version='2021_valid', download=True)

## 데이터의 클래스 수가 10000개여서 300개에 대해서만 학습


In [None]:
class INaturalist_Dataset(Dataset):
    def __init__(self, train='train', transforms=None):
        self.root_path = 'data/2021_train_mini' if train=='train' else 'data/2021_valid'
      
        self.class_names = [class_name for class_name in natsort.natsorted(os.listdir(self.root_path))][:300] 
        self.data_path = []
        for class_idx, class_name in enumerate(self.class_names):
            for file in os.listdir(os.path.join(self.root_path, class_name)):
                self.data_path.append({'file_path':os.path.join(self.root_path, class_name, file), 'class_idx':class_idx})
        self.transforms = transforms
    def __len__(self):
        return len(self.data_path)
    def __getitem__(self, idx):
        file_path = self.data_path[idx]['file_path']
        class_idx =  self.data_path[idx]['class_idx']
        img = cv2.imread(file_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self.transforms is not None:
            img = self.transforms(img)
        return img, class_idx

## 데이터증강 

In [None]:
normalize = T.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
train_transform = T.Compose([
    #T.ToPILImage(),
    T.ToTensor(),
    T.Resize((224, 224)),
    T.RandomHorizontalFlip(),
    normalize
])

val_transform = T.Compose([
    #T.ToPILImage(),
    T.ToTensor(),
    T.Resize((224, 224)),
   
    normalize
])

In [None]:
batch_size = 16
# 대규모 데이터 학습 하고 싶으면 해제
# trainset = INaturalist_Dataset('train', train_transform)
# testset = INaturalist_Dataset('val', val_transform)
# trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
#                                           shuffle=True, num_workers=0)


# testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
#                                          shuffle=False, num_workers=0)

In [None]:
trainset = torchvision.datasets.STL10('./data', split='train', download=True, transform=train_transform)
testset = torchvision.datasets.STL10('./data', split='test', download=True, transform=val_transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=0)


testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=0)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 이미지를 보여주기 위한 함수
classes = [class_name.split('_')[-2]+'_'+class_name.split('_')[-1] 
           for class_name in natsort.natsorted(os.listdir('data/2021_train_mini'))[:300]]

def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# 학습용 이미지를 무작위로 가져오기
dataiter = iter(trainloader)
images, labels = next(dataiter)

# 이미지 보여주기
imshow(torchvision.utils.make_grid(images))
# 정답(label) 출력
print(' '.join(f'{classes[labels[j]]:5s}' for j in range(batch_size)))

## VGGNET 구현

In [None]:
class VGG(nn.Module):
    def __init__(self, features,output_dim,init_weights=True):
        super().__init__()        
        self.features = features  # 이미지 특징 추출       
        self.avgpool = nn.AdaptiveAvgPool2d(7)        
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace = True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace = True),
            nn.Dropout(0.5),
            nn.Linear(4096, output_dim),
        )
        # 초기 가중치 초기화
        if init_weights:
            for m in self.modules():
                if isinstance(m, nn.Conv2d):
                    nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
                    if m.bias is not None:
                        nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.BatchNorm2d):
                    nn.init.constant_(m.weight, 1)
                    nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.Linear):
                    nn.init.normal_(m.weight, 0, 0.01)
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.classifier(h)
        return x

## vggNet layer의 개수에 따른 구조

In [None]:
vgg11_config = [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']

vgg13_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']

vgg16_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 
                512, 'M']

vgg19_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 
                512, 512, 512, 512, 'M']

## 특징 학습 layer 구현

In [None]:
def get_vgg_layers(config, batch_norm):    
    layers = []
    in_channels = 3
    
    for c in config:
        assert c == 'M' or isinstance(c, int)
        if c == 'M':
            layers += [nn.MaxPool2d(kernel_size = 2)]
        else:
            conv2d = nn.Conv2d(in_channels, c, kernel_size = 3, padding = 1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(c), nn.ReLU(inplace = True)]
            else:
                layers += [conv2d, nn.ReLU(inplace = True)]
            in_channels = c
            
    return nn.Sequential(*layers)

In [None]:
vgg19_layers = get_vgg_layers(vgg19_config, batch_norm = True) #batch_norm 배치 단위로 입력을 정규화

## 모델 학습

In [None]:
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, 'model_best.pth.tar')

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = VGG(vgg19_layers, 300).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()
epochs = 30
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [None]:
def train(loader, model,optimizer, criterion):
    running_loss = 0.0
    
    correct = 0
    total = 0

    model.train()
    for i, data in enumerate(tqdm(loader), 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        
        loss = criterion(outputs, labels)
       
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        
        loss.backward()
        optimizer.step()
        
        
        # print statistics
        running_loss += loss.item()
    epoch_loss = running_loss /len(loader)
    epoch_acc = correct / total
    
    return epoch_loss, epoch_acc

In [None]:
def validation(loader, model, optimizer, criterion):
    running_loss = 0.0
    
    correct = 0
    total = 0

    model.eval()
    with torch.no_grad():
        for i, data in enumerate(tqdm(loader), 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)

            loss = criterion(outputs, labels)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()



            # print statistics
            running_loss += loss.item()
    epoch_loss = running_loss /len(loader)
    epoch_acc = correct / total
    
    return epoch_loss, epoch_acc

In [None]:

for epoch in range(epochs):  # loop over the dataset multiple times
    best_acc = 0

    train_loss, train_acc = train(trainloader,model, optimizer, criterion)
    val_loss, val_acc = validation(testloader,model, optimizer, criterion)
    scheduler.step()
    is_best = val_acc > best_acc
    best_acc1 = max(val_acc, best_acc)
    
    
    save_checkpoint({
            'epoch': epoch + 1,

            'state_dict': model.state_dict(),
            'best_acc1': best_acc,
            'optimizer' : optimizer.state_dict(),
            'scheduler' : scheduler.state_dict()
        }, is_best)
        
    print(f'[{epoch}], train_loss:{train_loss:.4f}, val_loss:{val_loss:.4f}, train_acc:{train_acc*100:.4f}, val_acc:{val_acc*100:.4f}')
       
   
        
       
