# PytorchTutorial in Colab

このノートブックは、以下の各ファイルの内容を統合して Colab 上で実行できるようにまとめたものです。

- `common.py`
- `augment.py`
- `plot.py`
- `dataloader.py`
- `train_val.py`
- `my_cnn.py`
- `cifar_resnet.py`
- `main.py`

※ エポック数などは Colab で手早く動作させるために短縮しています。

In [None]:
# 必要なライブラリのインストール
!pip install torch torchvision matplotlib


In [None]:
### 諸々の定義

import random
import numpy as np
import matplotlib.pyplot as plt

import torch


def setup_device():
    if torch.cuda.is_available():
        device = torch.device("cuda")
        torch.backends.cudnn.benchmark = True

    else:
        device = "cpu"

    print("CUDA is available:", torch.cuda.is_available())
    return device


def fixed_r_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True


def get_time(interval):
    time = {"time" : "{}h {}m {}s".format(
            int(interval / 3600), 
            int((interval % 3600) / 60), 
            int((interval % 3600) % 60))}
    return time


# show sample 12 imgs
def show_img(dataloader):
    for batched in dataloader:
        images = batched["image"]
        labels = batched["label"]
        break

    # 0-1の正規化を解除
    images = (images - images.min()) / (images.max() - images.min())
    
    fig, axes = plt.subplots(3, 4, figsize=(12, 9))
    for i in range(12):
        ax = axes[i // 4, i % 4]
        img = np.transpose(images[i].numpy(), (1, 2, 0))  # (C, H, W) → (H, W, C)
        ax.imshow(img)
        ax.set_title(f"Label: {labels[i]}")
        ax.axis('off')

    plt.show()
    






In [None]:
import glob
import os
from PIL import Image

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


class DatasetLoader(Dataset):
    def __init__(self, root, phase, transform=None):
        super().__init__()
        self.transform = transform

        self.image_paths = []
        self.image_labels = []
        self.class_name = os.listdir(os.path.join(root, phase))
        self.class_name.sort()
        for i, x in enumerate(self.class_name):
            temp = sorted(glob.glob(os.path.join(root, phase, x, "*")))
            self.image_labels.extend([i] * len(temp))
            self.image_paths.extend(temp)

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        image = Image.open(image_path).convert("RGB")

        if self.transform is not None:
            image = self.transform(image)
            
        return {"image": image, "label": self.image_labels[index]}

    def __len__(self):
        return len(self.image_paths)
    

def get_dataloader(dataset_path, img_size=32, batch_size=128):

    ## データ拡張の設定 (自由に変更)
    augmentation_list = ['rcrop', 'hflip', 'ra']
    print(f'Apply augmentation ... {augmentation_list}')

    additional_transform_list = []
    for augment in augmentation_list:
        # 画像の一部をランダムに切り出し
        if augment == 'rcrop':
            additional_transform_list.append(
                transforms.RandomResizedCrop(size=img_size, scale=(0.5, 1.00), ratio=(1.0, 1.0))
            )
        # 水平反転
        elif augment == 'hflip':
            additional_transform_list.append(
                transforms.RandomHorizontalFlip(p=0.5)
            )
        # RandAugment
        elif augment == 'ra':  # RandAugment
            additional_transform_list.append(
                transforms.RandAugment(num_ops=2, magnitude=9)
            )
        # Color変換
        elif augment == 'cjitter':
            additional_transform_list.append(
                transforms.ColorJitter(brightness=0.3,contrast=0.3,saturation=0.3,hue=0.3)
            )
        # グレー化
        elif augment == 'gray':
            additional_transform_list.append(
                transforms.RandomGrayscale(p=0.1)
            )
        # 上下反転
        elif augment == 'vflip':
            additional_transform_list.append(
                transforms.RandomVerticalFlip(p=1.0)
            )

    # trainにはデータ拡張を適用
    train_transform = transforms.Compose(
        [transforms.Resize((img_size, img_size))]
        + additional_transform_list
        + [transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]
    )
    # val, testにはデータ拡張はなし
    test_transform = transforms.Compose(
        [transforms.Resize((img_size, img_size))]
        + [transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]
    )

    # データセットを読み込む
    train_dataset = DatasetLoader(dataset_path, 'train', train_transform)
    val_dataset = DatasetLoader(dataset_path, 'val', test_transform)
    test_dataset = DatasetLoader(dataset_path, 'test', test_transform)
    
    # バッチごとに取り出せるようにDataLoaderに登録
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=8,
        pin_memory=False,
        drop_last=False,
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=8,
        pin_memory=False,
        drop_last=False,
    )
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=8,
        pin_memory=False,
        drop_last=False,
    )

    print(f'Load dataset, Num of dataset ... Train : {len(train_dataset)}  Val : {len(val_dataset)}  Test : {len(test_dataset)}')

    show_img(train_loader)

    return train_loader, val_loader, test_loader

 

In [None]:
### train_val.py の内容
def train(model, device, dataloader, optimizer, criterion):
    model.train()
    total_loss, correct = 0, 0
    for data, target in dataloader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        correct += (output.argmax(1) == target).sum().item()
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / len(dataloader.dataset)
    return avg_loss, accuracy

def val(model, device, dataloader, criterion):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            total_loss += loss.item()
            correct += (output.argmax(1) == target).sum().item()
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / len(dataloader.dataset)
    return avg_loss, accuracy

def test(model, device, dataloader, criterion):
    return val(model, device, dataloader, criterion)


In [None]:
### my_cnn.py の内容
class MyCNN(nn.Module):
    def __init__(self, n_class=10):
        super(MyCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, n_class)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [None]:
### cifar_resnet.py の内容（簡易版）
class ResNetBasicBlock(nn.Module):
    def __init__(self, depth=20, n_class=10):
        super(ResNetBasicBlock, self).__init__()
        # ここでは簡単な例として、1層の畳み込みとプーリング・全結合層を定義
        self.conv = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc = nn.Linear(64 * 16 * 16, n_class)  

    def forward(self, x):
        x = self.pool(F.relu(self.conv(x)))
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [None]:
### main.py の内容
def main():
    print('start main')

    seed = 1
    n_epoch = 5  # Colab 向けに短縮
    lr = 0.01
    dataset_path = './cifar10'
    save_dir = './fig'
    os.makedirs(save_dir, exist_ok=True)

    # デバイス設定
    device = setup_device()
    fixed_r_seed(seed)

    # データローダー作成
    train_loader, val_loader, test_loader = get_dataloader(dataset_path=dataset_path, img_size=32, batch_size=128)

    # データ拡張結果を確認（最初のバッチ画像を保存）
    show_img(save_path=os.path.join(save_dir, 'ex_img.png'), dataloader=train_loader)

    # モデルの定義
    # model = MyCNN(n_class=10)
    model = ResNetBasicBlock(depth=20, n_class=10)  # こちらを使用
    model.to(device)

    # 最適化アルゴリズムの定義
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, weight_decay=1e-5, momentum=0.9)

    # 損失関数の定義
    criterion = nn.CrossEntropyLoss()

    start_time = time.time()
    all_training_result = []
    for epoch in range(1, n_epoch + 1):
        elapsed = time.time() - start_time
        interval = get_time(elapsed)
        print(f"Lr: {optimizer.param_groups[0]['lr']} , Time: {interval['time']}")
        
        train_loss, train_acc = train(model, device, train_loader, optimizer, criterion)
        val_loss, val_acc = val(model, device, val_loader, criterion)
        all_training_result.append([train_loss, train_acc, val_loss, val_acc])
        print(
            f"Epoch: [{epoch}/{n_epoch}] \t"
            + f"Train Loss: {train_loss:.6f} \t"
            + f"Train Acc: {train_acc*100:.2f}% \t"
            + f"Val Loss: {val_loss:.6f} \t"
            + f"Val Acc: {val_acc*100:.2f}% \t"
        )
        sys.stdout.flush()

    test_loss, test_acc = test(model, device, test_loader, criterion)
    print(f"Test Loss: {test_loss:.6f}  Test Acc: {test_acc*100:.2f}%")

if __name__ == '__main__':
    main()
