In [1]:
import torch
import torchvision
import torchvision.transforms as transforms

In [2]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
trainset = torchvision.datasets.CIFAR10('/home/hefb/data/', train=True, transform=transform_train)
valset = torchvision.datasets.CIFAR10('/home/hefb/data', train=False, transform=transform_test)

In [3]:
image, label = trainset[0]
print(image.shape, label)

torch.Size([3, 32, 32]) 6


In [4]:
import torch.nn as nn
import torch.nn.functional as F

class ConvBlock2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_channels)
    
    def forward(self, x):
        return F.relu(self.bn(self.conv(x)))


class ResBlock2d(nn.Module):
    def __init__(self, in_channels, out_channels=None, stride=1):
        super().__init__()
        out_channels = in_channels if out_channels is None else out_channels
        self.downsample = None
        if out_channels != in_channels or stride != 1:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride), 
                nn.BatchNorm2d(out_channels)
            )

        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x, inplace=True)
        x = self.conv2(x)
        x = self.bn2(x)

        if self.downsample is not None:
            identity = self.downsample(identity)
        
        x += identity
        out = F.relu(x)
        return out


class Bottleneck2d(nn.Module):
    def __init__(self, in_channels, out_channels=None, stride=1, expansion=4):
        super().__init__()
        mid_channels = in_channels // expansion
        out_channels = in_channels if out_channels is None else out_channels

        self.downsample = None
        if out_channels != in_channels or stride != 1:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride), 
                nn.BatchNorm2d(out_channels)
            )

        self.conv1 = nn.Conv2d(in_channels, mid_channels, 1)
        self.bn1 = nn.BatchNorm2d(mid_channels)
        self.conv2 = nn.Conv2d(mid_channels, mid_channels, 3, stride, 1)
        self.bn2 = nn.BatchNorm2d(mid_channels)
        self.conv3 = nn.Conv2d(mid_channels, out_channels, 1)

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x, inplace=True)

        x = self.conv2(x)
        x = self.bn2(x)
        x = F.relu(x, inplace=True)

        x = self.conv3(x)
        x = self.bn3(x)

        if self.downsample is not None:
            identity = self.downsample(identity)
        
        x += identity
        out = F.relu(x)
        return out
        
class ResNetBackbone2d(nn.Module):
    def __init__(self, in_channels=3, layers=[2,2,2,2], channels=[64,128,256,512], dropout=0, block=ResBlock2d):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, channels[0], 3, 1, 1)
        self.bn1 = nn.BatchNorm2d(channels[0])

        self.layers = []
        pre_chns = channels[0]
        strides = [1, 2, 2, 2]
        for num_blocks, num_channels, stride in zip(layers, channels, strides):
            self.layers.append(ResNetBackbone2d._make_layer(block, num_blocks, pre_chns, num_channels, stride))
            pre_chns = num_channels
        self.layers = nn.Sequential(*self.layers)
        
        self.dropout = nn.Dropout(dropout)
        self.gap = nn.AdaptiveAvgPool2d((1, 1))

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        bs = x.shape[0]
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)

        x = self.layers(x)
        x = self.dropout(x)
        x = self.gap(x)

        return x.reshape(bs, -1)

    @staticmethod
    def _make_layer(block, num_blocks, in_channels, out_channels, stride=1):
        blocks = []
        blocks.append(block(in_channels, out_channels, stride))
        for i in range(1, num_blocks):
            blocks.append(block(out_channels, out_channels, 1))
        return nn.Sequential(*blocks)


class ResNet2d(nn.Module):
    def __init__(self, num_classes=10, layers=[2,2,2,2], channels=[64,128,256,512], dropout=0, block=ResBlock2d):
        super().__init__()
        self.backbone = ResNetBackbone2d(3, layers, channels, dropout, block)
        self.classifier = nn.Linear(channels[-1], num_classes)
    
    def forward(self, x):
        feat = self.backbone(x)
        out = self.classifier(feat)
        return out

In [5]:
from torch.utils.data import DataLoader
from torch.optim import Adam

train_loader = DataLoader(trainset, 32, num_workers=4, shuffle=True, drop_last=True)
val_loader = DataLoader(valset, 32, num_workers=4)

loss_fn = nn.CrossEntropyLoss()
model = ResNet2d(10, [2,2,2,2], [64,128,256,512]).to('cuda:3')
optimizer = Adam(model.parameters(), lr=0.01)

In [6]:
epoch = 100
for i in range(epoch):
    # train epoch
    acc, losses, total = 0, 0, 0
    model.train()
    for image, label in train_loader:
        image, label = image.to('cuda:3'), label.to('cuda:3')
        output = model(image)
        optimizer.zero_grad()
        loss = loss_fn(output, label)
        loss.backward()
        optimizer.step()

        bs = image.shape[0]
        total += bs
        losses += loss.item() * bs
        acc += torch.sum(label == (output.argmax(dim=1)))
    acc = acc / total 
    losses = losses / total
    print(f"Train Epoch {i} | acc:{acc}, loss:{losses}")

    # val epoch
    with torch.no_grad():
        model.eval()
        acc, losses, total = 0, 0, 0
        for image, label in val_loader:
            image, label = image.to('cuda:3'), label.to('cuda:3')
            output = model(image)
            loss = loss_fn(output, label)

            bs = image.shape[0]
            total += bs
            losses += loss.item() * bs
            acc += torch.sum(label == (output.argmax(dim=1)))
        acc = acc / total 
        losses = losses / total
        print(f"Test Epoch {i} | acc:{acc}, loss:{losses}")

Train Epoch 0 | acc:0.3597351312637329, loss:1.7293163936361002
Test Epoch 0 | acc:0.47849997878074646, loss:1.4034170670509338
Train Epoch 1 | acc:0.5630801916122437, loss:1.2084662023259185
Test Epoch 1 | acc:0.6014999747276306, loss:1.1270315172195435
Train Epoch 2 | acc:0.6777768731117249, loss:0.9124075979299643
Test Epoch 2 | acc:0.7379999756813049, loss:0.7551484267234803
Train Epoch 3 | acc:0.7483394742012024, loss:0.7268371815947046
Test Epoch 3 | acc:0.7759999632835388, loss:0.6545242427825928
Train Epoch 4 | acc:0.7887123823165894, loss:0.6087451917566502
Test Epoch 4 | acc:0.8141999840736389, loss:0.5315461729526519
Train Epoch 5 | acc:0.8150408267974854, loss:0.5354676114278398
Test Epoch 5 | acc:0.8156999945640564, loss:0.5348099000930786
Train Epoch 6 | acc:0.834326982498169, loss:0.47981268288174145
Test Epoch 6 | acc:0.8345999717712402, loss:0.48754143226146696
Train Epoch 7 | acc:0.8470910787582397, loss:0.4405014265314336
Test Epoch 7 | acc:0.8585999608039856, loss:0

KeyboardInterrupt: 

: 