# Robustness on CIFAR

In [None]:
import os
import time
import json
import copy
from pathlib import Path
import datetime
import csv

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

import models
import ops.trains as trains
import ops.tests as tests
import ops.datasets as datasets
import ops.schedulers as schedulers

In [None]:
# config_path = "configs/cifar10_general.json"
config_path = "configs/cifar100_general.json"
# config_path = "configs/imagenet_general.json"

with open(config_path) as f:
    args = json.load(f)
    print("args: \n", args)

In [None]:
dataset_args = copy.deepcopy(args).get("dataset")
train_args = copy.deepcopy(args).get("train")
val_args = copy.deepcopy(args).get("val")
model_args = copy.deepcopy(args).get("model")
optim_args = copy.deepcopy(args).get("optim")
env_args = copy.deepcopy(args).get("env")

In [None]:
dataset_train, dataset_test = datasets.get_dataset(**dataset_args, download=True)
dataset_name = dataset_args["name"]
num_classes = len(dataset_train.classes)

dataset_train = DataLoader(dataset_train, 
                           shuffle=True, 
                           num_workers=train_args.get("num_workers", 4), 
                           batch_size=train_args.get("batch_size", 128))
dataset_test = DataLoader(dataset_test, 
                          num_workers=val_args.get("num_workers", 4), 
                          batch_size=val_args.get("batch_size", 128))

print("Train: %s, Test: %s, Classes: %s" % (
    len(dataset_train.dataset), 
    len(dataset_test.dataset), 
    num_classes
))

## Model

In [None]:
# AlexNet
# name = "alexnet_dnn"
# name = "alexnet_dnn_smoothing"
# name = "alexnet_mcdo"
# name = "alexnet_mcdo_smoothing"

# VGG
# name = "vgg_dnn_19"
# name = "vgg_dnn_smoothing_19"
# name = "vgg_mcdo_19"
# name = "vgg_mcdo_smoothing_19"

# ResNet
name = "resnet_dnn_18"
# name = "resnet_dnn_smoothing_18"
# name = "resnet_mcdo_18"
# name = "resnet_mcdo_smoothing_18"

# name = "resnet_dnn_50"
# name = "resnet_mcdo_50"
# name = "resnet_dnn_smoothing_50"
# name = "resnet_mcdo_smoothing_50"

# Preact ResNet
# name = "preresnet_dnn_50"
# name = "preresnet_mcdo_50"
# name = "preresnet_dnn_smoothing_50"
# name = "preresnet_mcdo_smoothing_50"

# ResNeXt
# name = "resnext_dnn_50"
# name = "resnext_mcdo_50"
# name = "resnext_dnn_smoothing_50"
# name = "resnext_mcdo_smoothing_50"

# WideResNet
# name = "wideresnet_dnn_50"
# name = "wideresnet_mcdo_50"
# name = "wideresnet_dnn_smoothing_50"
# name = "wideresnet_mcdo_smoothing_50"


uid = ""  # Model UID required
model = models.get_model(name, num_classes=num_classes, 
                         stem=model_args.get("stem", False))
models.load(model, dataset_name, uid)

Parallelize the given `moodel` by splitting the input:

In [None]:
name = model.name
model = nn.DataParallel(model)
model.name = name

Test model performance on in-domain data:

In [None]:
gpu = torch.cuda.is_available()

model = model.cuda() if gpu else model.cpu()
metrics_list = []
for n_ff in [1]:
    print("N: %s, " % n_ff, end="")
    *metrics, cal_diag = tests.test(model, n_ff, dataset_test, verbose=False, gpu=gpu)
    metrics_list.append([n_ff, *metrics])

## Corruption

In [None]:
n_ff = 1

gpu = torch.cuda.is_available()
val_args = copy.deepcopy(args).get("val")
dataset_args = copy.deepcopy(args).get("dataset")

model = model.cuda() if gpu else model.cpu()
metrics_c = { intensity: {} for intensity in range(1, 6) }
for intensity in range(1, 6):
    for ctype in datasets.get_corruptions():
        dataset_c = datasets.get_dataset_c(**dataset_args, ctype=ctype, intensity=intensity, download=True)
        dataset_c = DataLoader(dataset_c, 
                               num_workers=val_args.get("num_workers", 4), 
                               batch_size=val_args.get("batch_size", 128))
        print("Corruption type: %s, Intensity: %d, " % (ctype, intensity), end="")
        *metrics, cal_diag = tests.test(model, n_ff, dataset_c, verbose=False, gpu=gpu)
        metrics_c[intensity][ctype] = metrics

leaderboard_path = os.path.join("leaderboard", "logs", dataset_name, model.name)
Path(leaderboard_path).mkdir(parents=True, exist_ok=True)
metrics_dir = os.path.join(leaderboard_path, "%s_%s_%s_%s_corrupted.csv" % (dataset_name, model.name, uid, n_ff))
metrics_c_list = [[i, typ, *metrics] for i, typ_metrics in metrics_c.items() for typ, metrics in typ_metrics.items()]
tests.save_metrics(metrics_dir, metrics_c_list)

## Perturbation

In [None]:
n_ff = 1

gpu = torch.cuda.is_available()
val_args = copy.deepcopy(args).get("val")
dataset_args = copy.deepcopy(args).get("dataset")

metrics_p = {}
for ptype in datasets.get_perturbations():
    dataset_p = datasets.get_cifar10p(ptype=ptype,
                                      root="../data",
                                      base_folder="cifar-10-p")
    dataset_p = DataLoader(dataset_p, 
                           num_workers=val_args.get("num_workers", 4), 
                           batch_size=val_args.get("batch_size", 128))
    metrics = tests.test_perturbation(dataset_p, model, n_ff=n_ff)
    print("Perturbation type: %s, Consistency: %.5f, CEC: %.5f" % 
          (ptype, metrics[0], metrics[1]))
    metrics_p[ptype] = metrics

leaderboard_path = os.path.join("leaderboard", "logs", dataset_name, model.name)
Path(leaderboard_path).mkdir(parents=True, exist_ok=True)
metrics_dir = os.path.join(leaderboard_path, "%s_%s_%s_%s_perturbated.csv" % (dataset_name, model.name, uid, n_ff))
metrics_p_list = [[typ, *metrics] for typ, metrics in metrics_p.items()]
tests.save_lists(metrics_dir, metrics_p_list)