In [None]:
# import neccessary libraries
import os
import sys
import json
import requests
from tqdm import tqdm
import time
import datetime
import logging
import logging.handlers
import torch
import pandas as pd
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time
import os
from datasets import load_dataset
import torch.nn.parallel
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Subset

In [None]:
import wandb
import random
wandb.init(
    project="DL_Lab2",
    config={
    "learning_rate": 5e-4,
    "architecture": "CNN",
    "dataset": "CIFAR-10",
    "epochs": 20,
    }
)

In [None]:
# implement a CNN
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
# define the CNN architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)   
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1) 
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(256 * 2 * 2, 500)       
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))          
        x = x.view(-1, 256 * 2 * 2)                  
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

    def evaluate(self, test_loader, criterion, use_cuda):
        # calculate the accuracy on the test set
        model.eval()
        test_loss = 0.0
        class_correct = list(0. for i in range(10))
        class_total = list(0. for i in range(10))
        for data, target in tqdm(test_loader):
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()*data.size(0)
            _, pred = torch.max(output, 1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            correct = np.squeeze(correct_tensor.numpy()) if not use_cuda else np.squeeze(correct_tensor.cpu().numpy())
            for i in range(len(target.data)):
                label = target.data[i]
                class_correct[label] += correct[i].item()
                class_total[label] += 1
        test_loss = test_loss/len(test_loader.dataset)
        for i in range(10):
            if class_total[i] > 0:
                # log accuracy of each class
                wandb.log({"acc_{}".format(classes[i]): class_correct[i] / class_total[i]})
                #print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (str(i), 100 * class_correct[i] / class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])))
            else:
                print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))  
        #wandb log the average acc
        wandb.log({"acc": np.sum(class_correct) / np.sum(class_total)})
        # print average acc
        print('Test Accuracy (Overall): %2d%% (%2d/%2d)' % (100. * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))
        

    def train_model(model, train_loader, valid_loader, epochs, optimizer, criterion, use_cuda, save_path):
        valid_loss_min = np.Inf
        for epoch in tqdm(range(1, epochs+1)):
            train_loss = 0.0
            valid_loss = 0.0
            model.train()
            for data, target in train_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()*data.size(0)
            model.eval()
            for data, target in valid_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                output = model(data)
                loss = criterion(output, target)
                valid_loss += loss.item()*data.size(0)
            train_loss = train_loss/len(train_loader.sampler)
            valid_loss = valid_loss/len(valid_loader.sampler)
            model.evaluate( valid_loader, criterion, use_cuda)
            wandb.log({"training_loss": train_loss, "val_loss": valid_loss})
            print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(epoch, train_loss, valid_loss))
            if valid_loss <= valid_loss_min:
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min, valid_loss))
                torch.save(model.state_dict(), save_path)
                valid_loss_min = valid_loss
    


    def predict(model, test_loader, use_cuda):
        model.eval()
        test_preds = torch.LongTensor()
        use_cuda = torch.cuda.is_available()
        for i, data in tqdm(test_loader):
            if use_cuda:
                data = data.cuda()
            output = model(data)
            preds = output.cpu().data.max(1, keepdim=True)[1]
            test_preds = torch.cat((test_preds, preds), dim=0)
        return test_preds


In [None]:
transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 标准化
])
batch_size = 128
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# print the train and test dataset sizes
print('Train: ', len(train_loader.dataset))
print('Test: ', len(test_loader.dataset))
# print the train and test batch sizes
print('Train Batch Size: ', len(train_loader))
print('Test Batch Size: ', len(test_loader))

In [None]:
# print each class number of train dataset
for i in range(10):
    print('Number of %5s: %5d' % (classes[i], (np.array(train_dataset.targets) == i).sum()))

In [None]:
# label the classes
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# show the second image and the label
image, label = train_dataset[1]
image = image.permute(1, 2, 0)
plt.imshow(image)
plt.show()
print(classes[label])

In [None]:
def load_label_names():
    return ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

In [None]:
# use Net to train the model on CIFAR-10 define a instance of Net
'''
model = Net()
use_cuda = torch.cuda.is_available()
if use_cuda:
    model = model.cuda()
# train model
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001,  weight_decay=1e-4)
epochs = 50
Net.train_model(model, train_loader, test_loader, epochs, optimizer, criterion, use_cuda, 'model.pt')
'''

In [None]:
# test model
# model.evaluate(test_loader, criterion, use_cuda)

In [None]:
# implement a CNN with resiudal block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out
    
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10, dropout_rate=0.7):
        super(ResNet, self).__init__()
        self.in_channels = 32
        self.conv = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(32)
        self.layer1 = self.make_layer(block, 32, num_blocks[0], stride=1)
        self.layer2 = self.make_layer(block, 64, num_blocks[1], stride=2)
        self.layer3 = self.make_layer(block, 128, num_blocks[2], stride=2)
        self.layer4 = self.make_layer(block, 256, num_blocks[3], stride=2)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(256, num_classes)

    def make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn(self.conv(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.dropout(out) 
        out = self.fc(out)
        return out
        
    def evaluate(self, test_loader, criterion, use_cuda):
        # calculate the accuracy on the test set
        model.eval()
        test_loss = 0.0
        class_correct = list(0. for i in range(10))
        class_total = list(0. for i in range(10))
        for data, target in tqdm(test_loader):
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()*data.size(0)
            _, pred = torch.max(output, 1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            correct = np.squeeze(correct_tensor.numpy()) if not use_cuda else np.squeeze(correct_tensor.cpu().numpy())
            for i in range(len(target.data)):
                label = target.data[i]
                class_correct[label] += correct[i].item()
                class_total[label] += 1
        test_loss = test_loss/len(test_loader.dataset)
        for i in range(10):
            if class_total[i] > 0:
                # log accuracy of each class
                wandb.log({"acc_{}".format(classes[i]): class_correct[i] / class_total[i]})
                #print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (str(i), 100 * class_correct[i] / class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])))
            else:
                print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))  
        #wandb log the average acc
        wandb.log({"acc": np.sum(class_correct) / np.sum(class_total)})
        # print average acc
        print('Test Accuracy (Overall): %2d%% (%2d/%2d)' % (100. * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))
        

    def train_model(model, train_loader, valid_loader, epochs, optimizer, criterion, use_cuda, save_path):
        valid_loss_min = np.Inf
        for epoch in tqdm(range(1, epochs+1)):
            train_loss = 0.0
            valid_loss = 0.0
            model.train()
            for data, target in train_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()*data.size(0)
            model.eval()
            for data, target in valid_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                output = model(data)
                loss = criterion(output, target)
                valid_loss += loss.item()*data.size(0)
            train_loss = train_loss/len(train_loader.sampler)
            valid_loss = valid_loss/len(valid_loader.sampler)
            model.evaluate( valid_loader, criterion, use_cuda)
            wandb.log({"training_loss": train_loss, "val_loss": valid_loss})
            print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(epoch, train_loss, valid_loss))
            if valid_loss <= valid_loss_min:
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min, valid_loss))
                torch.save(model.state_dict(), save_path)
                valid_loss_min = valid_loss
    


    def predict(model, test_loader, use_cuda):
        model.eval()
        test_preds = torch.LongTensor()
        use_cuda = torch.cuda.is_available()
        for i, data in tqdm(test_loader):
            if use_cuda:
                data = data.cuda()
            output = model(data)
            preds = output.cpu().data.max(1, keepdim=True)[1]
            test_preds = torch.cat((test_preds, preds), dim=0)
        return test_preds


In [None]:
# define the ResNet architecture
'''
os.environ["CUDA_VISIBLE_DEVICES"] = "7"
model = ResNet(ResidualBlock, [3, 4, 6, 4])
use_cuda = torch.cuda.is_available()
if use_cuda:
    model = model.cuda()
# train model
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-4, weight_decay=5e-4)
epochs = 50
ResNet.train_model(model, train_loader, test_loader, epochs, optimizer, criterion, use_cuda, 'ResNet_model.pt')
'''

In [None]:
#test model
# model.evaluate(test_loader, criterion, use_cuda)


In [None]:
# 打印网络结构
# print(model)

In [None]:
# implement of DenseNet
class Bottleneck(nn.Module):
    def __init__(self, in_channels, growth_rate, dropout_rate=0.75):
        super(Bottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.conv1 = nn.Conv2d(in_channels, 4*growth_rate, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(4*growth_rate)
        self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)
        self.dropout = nn.Dropout(dropout_rate)
    
    def forward(self, x):
        out = self.conv1(F.relu(self.bn1(x)))
        out = self.conv2(F.relu(self.bn2(out)))
        out = self.dropout(out)
        out = torch.cat([out, x], 1)
        return out  

class Transition(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Transition, self).__init__()
        self.bn = nn.BatchNorm2d(in_channels)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)

    def forward(self, x):
        out = self.conv(F.relu(self.bn(x)))
        out = F.avg_pool2d(out, 2)
        return out
class DenseNet(nn.Module):
    def __init__(self, num_blocks, growth_rate=64, reduction=0.8, num_classes=10):
        super(DenseNet, self).__init__()
        num_channels = 2 * growth_rate
        self.conv1 = nn.Conv2d(3, num_channels, kernel_size=3, stride=1, padding=2, bias=False)
        self.dense1 = self.make_dense_layers(Bottleneck, num_channels, growth_rate, num_blocks[0])
        num_channels += num_blocks[0] * growth_rate
        out_channels = int(reduction * num_channels)
        self.trans1 = Transition(num_channels, out_channels)

        num_channels = out_channels
        self.dense2 = self.make_dense_layers(Bottleneck, num_channels, growth_rate, num_blocks[1])
        num_channels += num_blocks[1] * growth_rate
        out_channels = int(reduction * num_channels)
        self.trans2 = Transition(num_channels, out_channels)

        num_channels = out_channels
        self.dense3 = self.make_dense_layers(Bottleneck, num_channels, growth_rate, num_blocks[2])
        num_channels += num_blocks[2] * growth_rate
        out_channels = int(reduction * num_channels)
        self.trans3 = Transition(num_channels, out_channels)

        num_channels = out_channels
        self.dense4 = self.make_dense_layers(Bottleneck, num_channels, growth_rate, num_blocks[3])
        num_channels += num_blocks[3] * growth_rate

        self.bn = nn.BatchNorm2d(num_channels)
        self.fc = nn.Linear(num_channels, num_classes)

    def make_dense_layers(self, block, in_channels, growth_rate, num_blocks):
        layers = []
        for i in range(num_blocks):
            layers.append(block(in_channels + i * growth_rate, growth_rate))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.dense1(out)
        out = self.trans1(out)
        out = self.dense2(out)
        out = self.trans2(out)
        out = self.dense3(out)
        out = self.trans3(out)
        out = self.dense4(out)
        out = F.relu(self.bn(out))
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
    
    def evaluate(self, test_loader, criterion, use_cuda):
        # calculate the accuracy on the test set
        model.eval()
        test_loss = 0.0
        class_correct = list(0. for i in range(10))
        class_total = list(0. for i in range(10))
        for data, target in tqdm(test_loader):
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()*data.size(0)
            _, pred = torch.max(output, 1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            correct = np.squeeze(correct_tensor.numpy()) if not use_cuda else np.squeeze(correct_tensor.cpu().numpy())
            for i in range(len(target.data)):
                label = target.data[i]
                class_correct[label] += correct[i].item()
                class_total[label] += 1
        test_loss = test_loss/len(test_loader.dataset)
        for i in range(10):
            if class_total[i] > 0:
                # log accuracy of each class
                wandb.log({"acc_{}".format(classes[i]): class_correct[i] / class_total[i]})
                #print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (str(i), 100 * class_correct[i] / class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])))
            else:
                print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))  
        #wandb log the average acc
        wandb.log({"acc": np.sum(class_correct) / np.sum(class_total)})
        # print average acc
        print('Test Accuracy (Overall): %2d%% (%2d/%2d)' % (100. * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))
        

    def train_model(model, train_loader, valid_loader, epochs, optimizer, criterion, use_cuda, save_path):
        valid_loss_min = np.Inf
        for epoch in tqdm(range(1, epochs+1)):
            train_loss = 0.0
            valid_loss = 0.0
            model.train()
            for data, target in train_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()*data.size(0)
            model.eval()
            for data, target in valid_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                output = model(data)
                loss = criterion(output, target)
                valid_loss += loss.item()*data.size(0)
            train_loss = train_loss/len(train_loader.sampler)
            valid_loss = valid_loss/len(valid_loader.sampler)
            model.evaluate( valid_loader, criterion, use_cuda)
            wandb.log({"training_loss": train_loss, "val_loss": valid_loss})
            print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(epoch, train_loss, valid_loss))
            if valid_loss <= valid_loss_min:
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min, valid_loss))
                torch.save(model.state_dict(), save_path)
                valid_loss_min = valid_loss
    


    def predict(model, test_loader, use_cuda):
        model.eval()
        test_preds = torch.LongTensor()
        use_cuda = torch.cuda.is_available()
        for i, data in tqdm(test_loader):
            if use_cuda:
                data = data.cuda()
            output = model(data)
            preds = output.cpu().data.max(1, keepdim=True)[1]
            test_preds = torch.cat((test_preds, preds), dim=0)
        return test_preds    

In [None]:
# define the DenseNet architecture
'''
os.environ["CUDA_VISIBLE_DEVICES"] = "7"
model = DenseNet([6, 12, 24, 48])
use_cuda = torch.cuda.is_available()
if use_cuda:
    model = model.cuda()
# train model
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=5e-3)
epochs = 50
DenseNet.train_model(model, train_loader, test_loader, epochs, optimizer, criterion, use_cuda, 'DenseNet_model.pt')
'''

In [None]:
# test model
# model.evaluate(test_loader, criterion, use_cuda)

In [None]:
# implement a CNN with resiudal block
class SEBlock(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

class ResidualBlockWithSE(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlockWithSE, self).__init__()
        # Residual block components
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        self.se = SEBlock(out_channels)  # SE Block

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.se(out)  # Apply SE block
        out += self.shortcut(x)
        out = F.relu(out)
        return out

    
class ResNet_SE(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10, dropout_rate=0.7):
        super(ResNet_SE, self).__init__()
        self.in_channels = 32
        self.conv = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(32)
        self.layer1 = self.make_layer(block, 32, num_blocks[0], stride=1)
        self.layer2 = self.make_layer(block, 64, num_blocks[1], stride=2)
        self.layer3 = self.make_layer(block, 128, num_blocks[2], stride=2)
        self.layer4 = self.make_layer(block, 256, num_blocks[3], stride=2)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(256, num_classes)

    def make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn(self.conv(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.dropout(out) 
        out = self.fc(out)
        return out
        
    def evaluate(self, test_loader, criterion, use_cuda):
        # calculate the accuracy on the test set
        model.eval()
        test_loss = 0.0
        class_correct = list(0. for i in range(10))
        class_total = list(0. for i in range(10))
        for data, target in tqdm(test_loader):
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()*data.size(0)
            _, pred = torch.max(output, 1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            correct = np.squeeze(correct_tensor.numpy()) if not use_cuda else np.squeeze(correct_tensor.cpu().numpy())
            for i in range(len(target.data)):
                label = target.data[i]
                class_correct[label] += correct[i].item()
                class_total[label] += 1
        test_loss = test_loss/len(test_loader.dataset)
        for i in range(10):
            if class_total[i] > 0:
                # log accuracy of each class
                wandb.log({"acc_{}".format(classes[i]): class_correct[i] / class_total[i]})
                #print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (str(i), 100 * class_correct[i] / class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])))
            else:
                print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))  
        #wandb log the average acc
        wandb.log({"acc": np.sum(class_correct) / np.sum(class_total)})
        # print average acc
        print('Test Accuracy (Overall): %2d%% (%2d/%2d)' % (100. * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))
        

    def train_model(model, train_loader, valid_loader, epochs, optimizer, criterion, use_cuda, save_path):
        valid_loss_min = np.Inf
        for epoch in tqdm(range(1, epochs+1)):
            train_loss = 0.0
            valid_loss = 0.0
            model.train()
            for data, target in train_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()*data.size(0)
            model.eval()
            for data, target in valid_loader:
                if use_cuda:
                    data, target = data.cuda(), target.cuda()
                output = model(data)
                loss = criterion(output, target)
                valid_loss += loss.item()*data.size(0)
            train_loss = train_loss/len(train_loader.sampler)
            valid_loss = valid_loss/len(valid_loader.sampler)
            model.evaluate( valid_loader, criterion, use_cuda)
            wandb.log({"training_loss": train_loss, "val_loss": valid_loss})
            print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(epoch, train_loss, valid_loss))
            if valid_loss <= valid_loss_min:
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min, valid_loss))
                torch.save(model.state_dict(), save_path)
                valid_loss_min = valid_loss
    


    def predict(model, test_loader, use_cuda):
        model.eval()
        test_preds = torch.LongTensor()
        use_cuda = torch.cuda.is_available()
        for i, data in tqdm(test_loader):
            if use_cuda:
                data = data.cuda()
            output = model(data)
            preds = output.cpu().data.max(1, keepdim=True)[1]
            test_preds = torch.cat((test_preds, preds), dim=0)
        return test_preds


In [None]:
# define the ResNet_SE architecture
model = ResNet_SE(ResidualBlockWithSE, [6, 8, 12, 8])
use_cuda = torch.cuda.is_available()
if use_cuda:
    model = model.cuda()
# train model
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-4, weight_decay=5e-4)
epochs = 50
ResNet_SE.train_model(model, train_loader, test_loader, epochs, optimizer, criterion, use_cuda, 'ResNet_SE_model.pt')


In [None]:
# test model
model.evaluate(test_loader, criterion, use_cuda)