# 1. Import module

In [2]:
import numpy as np
# import easydict
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from PIL import Image
from sklearn.metrics import roc_auc_score
import time
import torch.optim as optim
from torch.utils.data.dataloader import DataLoader
from torch.utils.data import DataLoader, Subset, Dataset
from torchvision import transforms
from torchvision.datasets import MNIST as MyMNIST
from torchvision.datasets import CIFAR10 as MyCIFAR10
from abc import ABC, abstractmethod

import warnings
warnings.filterwarnings("ignore")

# 2. Dataset Load

### 2-1) dataset load시 필요한 함수 정의

In [3]:
# dataset load 할 때 전처리
def global_constrast_normalization(x: torch.tensor, scale='11'):
    assert scale in ('l1', 'l2')
    n_features = int(np.prod(x.shape))
    mean = torch.mean(x)
    x -= mean

    if scale == 'l1':
        x_scale = torch.mean(torch.abs(x))

    if scale == 'l2':
        x_scale = torch.sqrt(torch.sum(x**2))/n_features

    x /= x_scale

    return x

def get_target_label_idx(labels, targets):

    return np.argwhere(np.isin(labels, targets)).flatten().tolist()

### 2-2) min-max-scaling한 결과를 리스트로 저장해두기

In [4]:
from torchvision.datasets import MNIST
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10

transform = transforms.Compose([transforms.ToTensor(),
            transforms.Lambda(lambda x: global_constrast_normalization(x, scale='l1'))])

mnist_dataset = MNIST(root='./data', train=True, download=True, transform=transform)
cifar10_dataset = CIFAR10(root='./data', train=True, download=True, transform=transform)

#mnist_dataset min-max-scaling
min_max_mnist = []
for class_label in range(10):
    class_samples = [x for x, y in mnist_dataset if y == class_label]
    class_samples = torch.cat(class_samples)

    min_value = torch.min(class_samples)
    max_value = torch.max(class_samples)
    min_max_mnist.append((min_value.item(), max_value.item()))

min_max_cifar10 = []
for class_label in range(10):
    # 현재 클래스에 대한 filter sample
    class_samples = [x for x, y in cifar10_dataset if y == class_label]
    class_samples = torch.cat(class_samples)

    # min, max 계산
    min_value = torch.min(class_samples)
    max_value = torch.max(class_samples)
    min_max_cifar10.append((min_value.item(), max_value.item()))

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:01<00:00, 5770153.66it/s]


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 157530.01it/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:01<00:00, 1426388.14it/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 4066281.49it/s]


Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:15<00:00, 11334498.94it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data


In [5]:
min_max_mnist

[(-0.8826568126678467, 9.001545906066895),
 (-0.666146457195282, 20.108057022094727),
 (-0.7820455431938171, 11.665099143981934),
 (-0.764577329158783, 12.895051956176758),
 (-0.7253923416137695, 12.683235168457031),
 (-0.7698502540588379, 13.103279113769531),
 (-0.7784181237220764, 10.457836151123047),
 (-0.7129780650138855, 12.057777404785156),
 (-0.828040361404419, 10.581538200378418),
 (-0.7369959950447083, 10.697040557861328)]

In [6]:
min_max_cifar10

[(-28.94080924987793, 13.802960395812988),
 (-6.681769371032715, 9.158066749572754),
 (-34.92462158203125, 14.419297218322754),
 (-10.59916877746582, 11.093188285827637),
 (-11.945022583007812, 10.628044128417969),
 (-9.691973686218262, 8.94832706451416),
 (-9.174939155578613, 13.847018241882324),
 (-6.876684188842773, 12.28237247467041),
 (-15.603508949279785, 15.246490478515625),
 (-6.132884502410889, 8.046097755432129)]

### 2-3) dataset 불러오기

In [7]:
# MNIST
class MNIST_Dataset(Dataset):
    def __init__(self, root: str, normal_class):
        self.root = str(root)
        self.n_classes = 2  # 0: normal, 1: outlier
        self.normal_classes = tuple([normal_class])
        self.outlier_classes = list(range(0, 10))
        self.outlier_classes.remove(normal_class)

        # MNIST preprocessing: GCN (with L1 norm) and min-max feature scaling to [0,1]
        transform = transforms.Compose([transforms.ToTensor(),
                                        transforms.Lambda(lambda x: global_constrast_normalization(x, scale='l1')),
                                        transforms.Normalize([min_max_mnist[normal_class][0]],
                                                             [min_max_mnist[normal_class][1] - min_max_mnist[normal_class][0]])])

        target_transform = transforms.Lambda(lambda x: int(x in self.outlier_classes))

        train_set = MyMNIST(root=self.root, train=True, download=True,
                            transform=transform, target_transform=target_transform)
        # Subset train_set to normal class
        train_idx_normal = get_target_label_idx(train_set.targets, self.normal_classes)
        self.train_set = Subset(train_set, train_idx_normal)

        self.test_set = MyMNIST(root=self.root, train=False, download=True,
                                transform=transform, target_transform=target_transform)

    def loaders(self, batch_size: int, shuffle_train=True, shuffle_test=False, num_workers: int = 0) -> (
            DataLoader, DataLoader):
        train_loader = DataLoader(dataset=self.train_set, batch_size=batch_size, shuffle=shuffle_train,
                                  num_workers=num_workers)
        test_loader = DataLoader(dataset=self.test_set, batch_size=batch_size, shuffle=shuffle_test,
                                 num_workers=num_workers)
        return train_loader, test_loader


class MyMNIST(MNIST):

    def __init__(self, *args, **kwargs):
        super(MyMNIST, self).__init__(*args, **kwargs)

    def __getitem__(self, index):

        if self.train:
            img, target = self.train_data[index], self.train_labels[index]
        else:
            img, target = self.test_data[index], self.test_labels[index]

        # doing this so that it is consistent with all other datasets()
        # to return a PIL Image
        img = Image.fromarray(img.numpy(), mode='L')

        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target, index  # only line changed

###########################################################################################################

# CIFAR
class CIFAR10_Dataset(Dataset):
    def __init__(self, root: str, normal_class):
        self.root = str(root)
        self.n_classes = 2  # 0: normal, 1: outlier
        self.normal_classes = tuple([normal_class])
        self.outlier_classes = list(range(0, 10))
        self.outlier_classes.remove(normal_class)

        # CIFAR10 preprocessing: GCN (with L1 norm) and min-max feature scaling to [0,1]
        transform = transforms.Compose([transforms.ToTensor(),
                                        transforms.Lambda(lambda x: global_constrast_normalization(x, scale='l1')),
                                        transforms.Normalize([min_max_cifar10[normal_class][0]] * 3,
                                                             [min_max_cifar10[normal_class][1] - min_max_cifar10[normal_class][0]] * 3)])

        target_transform = transforms.Lambda(lambda x: int(x in self.outlier_classes))

        train_set = MyCIFAR10(root=self.root, train=True, download=True,
                            transform=transform, target_transform=target_transform)
        # Subset train_set to normal class
        train_idx_normal = get_target_label_idx(train_set.targets, self.normal_classes)
        self.train_set = Subset(train_set, train_idx_normal)

        self.test_set = MyCIFAR10(root=self.root, train=False, download=True,
                                transform=transform, target_transform=target_transform)

    def loaders(self, batch_size: int, shuffle_train=True, shuffle_test=False, num_workers: int = 0) -> (
            DataLoader, DataLoader):
        train_loader = DataLoader(dataset=self.train_set, batch_size=batch_size, shuffle=shuffle_train,
                                  num_workers=num_workers)
        test_loader = DataLoader(dataset=self.test_set, batch_size=batch_size, shuffle=shuffle_test,
                                 num_workers=num_workers)
        return train_loader, test_loader


class MyCIFAR10(CIFAR10):
    """Torchvision CIFAR10 class with patch of __getitem__ method to also return the index of a data sample."""

    def __init__(self, *args, **kwargs):
        super(MyCIFAR10, self).__init__(*args, **kwargs)
        self.train_data=self.data
        self.train_labels=self.targets
        self.test_data=self.data
        self.test_labels=self.targets
    def __getitem__(self, index):
        """Override the original method of the CIFAR10 class.
        Args:
            index (int): Index
        Returns:
            triple: (image, target, index) where target is index of the target class.
        """
        if self.train:
            img, target = self.train_data[index], self.train_labels[index]
        else:
            img, target = self.test_data[index], self.test_labels[index]

        # doing this so that it is consistent with all other datasets
        # to return a PIL Image
        img = Image.fromarray(img)

        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target, index  # only line changed


def load_dataset(dataset_name, data_path, normal_class):

    implemented_datasets = ('mnist', 'cifar10')
    assert dataset_name in implemented_datasets

    dataset = None

    if dataset_name == 'mnist':
        dataset = MNIST_Dataset(root=data_path, normal_class=normal_class)

    if dataset_name == 'cifar10':
        dataset = CIFAR10_Dataset(root=data_path, normal_class=normal_class)

    return dataset

# 3. Model setting

### 3-1) dataset 별 model 구조 정의

In [8]:
import torch.nn as nn
import numpy as np

class BaseNet(nn.Module):

    def __init__(self):
        super().__init__()
        print(f"Class name: {self.__class__.__name__}")
        self.rep_dim = None 

    def forward(self, *input):
        raise NotImplementedError

    def summary(self):

        net_parameters = filter(lambda p: p.requires_grad, self.parameters())
        params = sum([np.prod(p.size()) for p in net_parameters])

        print('Trainable parameters:', params)
        print(self)

# MNIST
class MNIST_LeNet(BaseNet):

    def __init__(self):
        super().__init__()

        self.rep_dim = 32
        self.fc1 = nn.Linear(4*7*7, self.rep_dim, bias = False)

        self.layer1=nn.Sequential(
            nn.Conv2d(1,8,5,bias=False,padding=2),
            nn.BatchNorm2d(8, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )

        self.layer2=nn.Sequential(
            nn.Conv2d(8,4,5,bias=False,padding=2),
            nn.BatchNorm2d(4, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )

    def forward(self,x):
        x=self.layer1(x)
        x=self.layer2(x)
        x=x.view(x.size(0),-1)
        return self.fc1(x)

  
class MNIST_LeNet_Autoencoder(BaseNet):

    def __init__(self):
        super().__init__()

        self.rep_dim = 32
        self.fc1 = nn.Linear(4*7*7, self.rep_dim, bias = False)

        self.layer1=nn.Sequential(
            nn.Conv2d(1,8,5,bias=False,padding=2),
            nn.BatchNorm2d(8, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )

        self.layer2=nn.Sequential(
            nn.Conv2d(8,4,5,bias=False,padding=2),
            nn.BatchNorm2d(4, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )

        self.layer3=nn.Sequential(
            nn.LeakyReLU(inplace=True),
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(2,4,5,bias=False,padding=2)
        )

        self.layer4=nn.Sequential(
            nn.BatchNorm2d(4,eps=1e-04,affine=False),
            nn.LeakyReLU(inplace=True),
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(4,8,5,bias=False,padding=3)
        )

        self.layer5=nn.Sequential(
            nn.BatchNorm2d(8,eps=1e-04,affine=False),
            nn.LeakyReLU(inplace=True),
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(8,1,5,bias=False,padding=2)
        )

    def forward(self, x):
        x=self.layer1(x)
        x=self.layer2(x)
        x=x.view(x.size(0),-1)
        x=self.fc1(x)
        x=x.view(x.size(0),int(self.rep_dim/16),4,4)
        x=self.layer3(x)
        x=self.layer4(x)
        x=self.layer5(x)
        x=torch.sigmoid(x)

        return x
    
###########################################################################################################

# CIFAR
class CIFAR10_LeNet(BaseNet):

    def __init__(self):
        super().__init__()

        self.rep_dim=128
        self.fc1=nn.Linear(128 * 4 * 4, self.rep_dim, bias=False)

        self.layer1=nn.Sequential(
            nn.Conv2d(3,32,5,bias=False,padding=2),
            nn.BatchNorm2d(32, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )
        self.layer2=nn.Sequential(
            nn.Conv2d(32,64,5,bias=False,padding=2),
            nn.BatchNorm2d(64, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )
        self.layer3=nn.Sequential(
            nn.Conv2d(64,128,5,bias=False,padding=2),
            nn.BatchNorm2d(128, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )

    def forward(self,x):
        x=self.layer1(x)
        x=self.layer2(x)
        x=self.layer3(x)
        x=x.view(x.size(0),-1)
        x=self.fc1(x)
        return x


class CIFAR10_LeNet_Autoencoder(BaseNet):

    def __init__(self):
        super().__init__()

        self.rep_dim=128
        self.fc1=nn.Linear(128 * 4 * 4, self.rep_dim, bias=False)
        self.bn1d = nn.BatchNorm1d(self.rep_dim, eps=1e-04, affine=False)

        self.conv1 = nn.Conv2d(3, 32, 5, bias=False, padding=2)
        self.conv2 = nn.Conv2d(32, 64, 5, bias=False, padding=2)
        self.conv3 = nn.Conv2d(64, 128, 5, bias=False, padding=2)

        self.layer1=nn.Sequential(
            nn.Conv2d(3,32,5,bias=False,padding=2)
        )
        nn.init.xavier_uniform_(self.conv1.weight, gain=nn.init.calculate_gain('leaky_relu'))

        self.layer2=nn.Sequential(
            nn.BatchNorm2d(32, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 5, bias=False, padding=2)
        )
        nn.init.xavier_uniform_(self.conv2.weight, gain=nn.init.calculate_gain('leaky_relu'))

        self.layer3=nn.Sequential(
            nn.BatchNorm2d(64, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 5, bias=False, padding=2)
        )
        nn.init.xavier_uniform_(self.conv3.weight, gain=nn.init.calculate_gain('leaky_relu'))

        self.layer4=nn.Sequential(
            nn.BatchNorm2d(128, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(2)
        )

        self.deconv1 = nn.ConvTranspose2d(int(self.rep_dim / (4 * 4)), 128, 5, bias=False, padding=2)
        self.deconv2 = nn.ConvTranspose2d(128, 64, 5, bias=False, padding=2)
        self.deconv3 = nn.ConvTranspose2d(64, 32, 5, bias=False, padding=2)
        self.deconv4 = nn.ConvTranspose2d(32, 3, 5, bias=False, padding=2)

        self.layer5=nn.Sequential(
            nn.LeakyReLU(inplace=True),
            nn.ConvTranspose2d(int(self.rep_dim / (4 * 4)), 128, 5, bias=False, padding=2),
            #nn.init.xavier_uniform_(self.deconv1.weight, gain=nn.init.calculate_gain('leaky_relu'))
        )
        nn.init.xavier_uniform_(self.deconv1.weight, gain=nn.init.calculate_gain('leaky_relu'))

        self.layer6=nn.Sequential(
            nn.BatchNorm2d(128, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(128, 64, 5, bias=False, padding=2),
            #nn.init.xavier_uniform_(self.deconv2.weight, gain=nn.init.calculate_gain('leaky_relu'))
        )
        nn.init.xavier_uniform_(self.deconv2.weight, gain=nn.init.calculate_gain('leaky_relu'))

        self.layer7=nn.Sequential(
            nn.BatchNorm2d(64, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(64, 32, 5, bias=False, padding=2),
            #nn.init.xavier_uniform_(self.deconv3.weight, gain=nn.init.calculate_gain('leaky_relu'))
        )
        nn.init.xavier_uniform_(self.deconv3.weight, gain=nn.init.calculate_gain('leaky_relu'))

        self.layer8=nn.Sequential(
            nn.BatchNorm2d(32, eps=1e-04, affine=False),
            nn.LeakyReLU(inplace=True),
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose2d(32, 3, 5, bias=False, padding=2),
            #nn.init.xavier_uniform_(self.deconv4.weight, gain=nn.init.calculate_gain('leaky_relu'))
        )
        nn.init.xavier_uniform_(self.deconv4.weight, gain=nn.init.calculate_gain('leaky_relu'))

    
    def forward(self,x):
        x=self.layer1(x)
        x=self.layer2(x)
        x=self.layer3(x)
        x=self.layer4(x)
        x=x.view(x.size(0),-1)
        x=self.bn1d(self.fc1(x))
        x=x.view(x.size(0), int(self.rep_dim / (4 * 4)), 4, 4)
        x=self.layer5(x)
        x=self.layer6(x)
        x=self.layer7(x)
        x=self.layer8(x)
        x=torch.sigmoid(x)
        return x

### 3-2) dataset 별 model을 할당하는 함수 정의

In [11]:
#net 할당    
def build_network(net_name):

    implemented_networks = ('mnist_LeNet', 'cifar10_LeNet')

    assert net_name in implemented_networks

    net = None

    if net_name == 'mnist_LeNet':
        net = MNIST_LeNet()

    if net_name == 'cifar10_LeNet':
        net = CIFAR10_LeNet()

    return net


#ae_net 할당
def build_autoencoder(net_name):

    implemented_networks = ('mnist_LeNet', 'cifar10_LeNet')
    assert net_name in implemented_networks

    ae_net = None

    if net_name == 'mnist_LeNet':
        ae_net = MNIST_LeNet_Autoencoder()

    if net_name == 'cifar10_LeNet':
        ae_net = CIFAR10_LeNet_Autoencoder()

    return ae_net

# 4. Training

### 4-1) deepsvdd 모델 학습

In [12]:
from torch.utils.data.dataloader import DataLoader

dataset_model = 'cifar10'
if dataset_model == 'mnist':
    dataset_model = MNIST_Dataset
elif dataset_model == 'cifar10':
    dataset_model = CIFAR10_Dataset

class BaseTrainer(ABC):

    def __init__(self, optimizer_name: str, lr: float, n_epochs: int, lr_milestones: tuple,
                 batch_size: int, weight_decay: float, device: str, n_jobs_dataloader: int):
        super().__init__()

        self.optimizer_name = optimizer_name
        self.lr = lr
        self.n_epochs = n_epochs
        self.lr_milestones = lr_milestones
        self.batch_size = batch_size
        self.weight_decay = weight_decay
        self.device = device
        self.n_jobs_dataloader = n_jobs_dataloader


        #여기 dataset 부분을 아예 없애고 밑에 train_loader를 받을 때 if문 추가해서 데이터셋별로 받으면 될듯..?
        @abstractmethod
        def train(self, dataset: dataset_model, net: BaseNet) -> BaseNet:
            pass

        @abstractmethod
        def test(self, dataset: dataset_model, net: BaseNet):
            pass


class DeepSVDDTrainer(BaseTrainer):

    def __init__(self, objective, R, c, nu: float, optimizer_name: str = 'adam', lr: float = 0.001,
                 n_epochs: int = 150, lr_milestones: tuple = (), batch_size: int = 128, weight_decay: float = 1e-6,
                 device: str = 'cuda', n_jobs_dataloader: int = 0):
        super().__init__(optimizer_name, lr, n_epochs, lr_milestones, batch_size, weight_decay, device,
                         n_jobs_dataloader)

        assert objective in ('one-class', 'soft-boundary'), "Objective must be either 'one-class' or 'soft-boundary'."
        self.objective = objective

        self.R = torch.tensor(R, device=self.device)  
        self.c = torch.tensor(c, device=self.device) if c is not None else None
        self.nu = nu

        self.warm_up_n_epochs = 10 

        self.train_time = None
        self.test_auc = None
        self.test_time = None
        self.test_scores = None

    def train(self, dataset: dataset_model, net: BaseNet):
     
        net = net.to(self.device)

        train_loader, _ = dataset.loaders(batch_size=self.batch_size, num_workers=self.n_jobs_dataloader)

        optimizer = optim.Adam(net.parameters(), lr=self.lr, weight_decay=self.weight_decay,
                               amsgrad=self.optimizer_name == 'amsgrad')

        scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=self.lr_milestones, gamma=0.1)

        if self.c is None:
            print('Initializing center c...')
            self.c = self.init_center_c(train_loader, net)
            print('Center c initialized.')

        print('====================Starting training====================')
        start_time = time.time()
        net.train()
        for epoch in range(self.n_epochs):

            scheduler.step()
            if epoch in self.lr_milestones:
                print('  LR scheduler: new learning rate is %g' % float(scheduler.get_lr()[0]))

            loss_epoch = 0.0
            n_batches = 0
            epoch_start_time = time.time()
            for data in train_loader:
                inputs, _, _ = data
                inputs = inputs.to(self.device)

                optimizer.zero_grad()

                outputs = net(inputs)
                dist = torch.sum((outputs - self.c) ** 2, dim=1) 
                if self.objective == 'soft-boundary':
                    scores = dist - self.R ** 2
                    loss = self.R ** 2 + (1 / self.nu) * torch.mean(torch.max(torch.zeros_like(scores), scores))
                else:
                    loss = torch.mean(dist) 
                loss.backward()
                optimizer.step()

                if (self.objective == 'soft-boundary') and (epoch >= self.warm_up_n_epochs):
                    self.R.data = torch.tensor(get_radius(dist, self.nu), device=self.device)

                loss_epoch += loss.item()
                n_batches += 1

            epoch_train_time = time.time() - epoch_start_time

            print('  Epoch {}/{}\t Time: {:.3f}\t Loss: {:.8f}'
                        .format(epoch + 1, self.n_epochs, epoch_train_time, loss_epoch / n_batches))

        self.train_time = time.time() - start_time
        print('Training time: %.3f' % self.train_time)

        print('====================Finished training====================')

        return net

    def test(self, dataset: dataset_model, net: BaseNet):
     
        net = net.to(self.device)

        _, test_loader = dataset.loaders(batch_size=self.batch_size, num_workers=self.n_jobs_dataloader)

        print('====================Starting testing====================')
        start_time = time.time()
        idx_label_score = []
        net.eval()
        with torch.no_grad():
            for data in test_loader:
                inputs, labels, idx = data
                inputs = inputs.to(self.device)
                outputs = net(inputs)
                dist = torch.sum((outputs - self.c) ** 2, dim=1)
                if self.objective == 'soft-boundary':
                    scores = dist - self.R ** 2
                else:
                    scores = dist

                idx_label_score += list(zip(idx.cpu().data.numpy().tolist(),
                                            labels.cpu().data.numpy().tolist(),
                                            scores.cpu().data.numpy().tolist()))

        self.test_time = time.time() - start_time
        print('Testing time: %.3f' % self.test_time)

        self.test_scores = idx_label_score

        _, labels, scores = zip(*idx_label_score)
        labels = np.array(labels)
        scores = np.array(scores)

        self.test_auc = roc_auc_score(labels, scores)
        print('==================================================')
        print('Test set AUC: {:.2f}%'.format(100. * self.test_auc))
        print('==================================================')

        print('==========Finished testing==========')

    def init_center_c(self, train_loader: DataLoader, net: BaseNet, eps=0.1):
        """Initialize hypersphere center c as the mean from an initial forward pass on the data."""
        n_samples = 0
        c = torch.zeros(net.rep_dim, device=self.device)

        net.eval()
        with torch.no_grad():
            for data in train_loader:
                # get the inputs of the batch
                inputs, _, _ = data
                inputs = inputs.to(self.device)
                outputs = net(inputs)
                n_samples += outputs.shape[0]
                c += torch.sum(outputs, dim=0)

        c /= n_samples # c의 평균값 계산

        # c가 0에 너무 가까운 경우를 처리
        c[(abs(c) < eps) & (c < 0)] = -eps
        c[(abs(c) < eps) & (c > 0)] = eps

        return c

#R 게산
def get_radius(dist: torch.Tensor, nu: float):
    """Optimally solve for radius R via the (1-nu)-quantile of distances."""
    return np.quantile(np.sqrt(dist.clone().data.cpu().numpy()), 1 - nu)
# 모든 원소에 대해 거리를 구한 후 계산된 제곱근 거리에 대해 (1-nu) 분위수를 계산.
# 즉, 거리 분포에서 상위 (1-nu)%에 해당하는 거리를 찾음 -> 최종적으로 반환되는 반지름 R임

### 4-2) Pretrain(autoencoder) 모델 학습

In [13]:
class AETrainer(BaseTrainer):

    def __init__(self, optimizer_name: str = 'adam', lr: float = 0.001, n_epochs: int = 150, lr_milestones: tuple = (),
                 batch_size: int = 128, weight_decay: float = 1e-6, device: str = 'cuda', n_jobs_dataloader: int = 0):
        super().__init__(optimizer_name, lr, n_epochs, lr_milestones, batch_size, weight_decay, device,
                         n_jobs_dataloader)

    def train(self, dataset: dataset_model, ae_net: BaseNet):
        # logger = logging.getLogger()

        # Set device for network
        ae_net = ae_net.to(self.device)

        # Get train data loader
        train_loader, _ = dataset.loaders(batch_size=self.batch_size, num_workers=self.n_jobs_dataloader)

        # Set optimizer (Adam optimizer for now)
        optimizer = optim.Adam(ae_net.parameters(), lr=self.lr, weight_decay=self.weight_decay,
                               amsgrad=self.optimizer_name == 'amsgrad')

        # Set learning rate scheduler
        scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=self.lr_milestones, gamma=0.1)

        # Training
        # logger.info('Starting pretraining...')
        print('==========Starting pretraining==========')
        start_time = time.time()
        ae_net.train() #autoencoder 학습시킴
        for epoch in range(self.n_epochs):

            scheduler.step()
            if epoch in self.lr_milestones:
                # logger.info('  LR scheduler: new learning rate is %g' % float(scheduler.get_lr()[0]))
                print('  LR scheduler: new learning rate is %g' % float(scheduler.get_lr()[0]))

            loss_epoch = 0.0
            n_batches = 0
            epoch_start_time = time.time()
            for data in train_loader:
                inputs, _, _ = data
                inputs = inputs.to(self.device)

                # Zero the network parameter gradients
                optimizer.zero_grad()

                # Update network parameters via backpropagation: forward + backward + optimize
                outputs = ae_net(inputs)
                scores = torch.sum((outputs - inputs) ** 2, dim=tuple(range(1, outputs.dim())))
                loss = torch.mean(scores)
                loss.backward()
                optimizer.step()

                loss_epoch += loss.item()
                n_batches += 1

            # log epoch statistics
            epoch_train_time = time.time() - epoch_start_time
            # logger.info('  Epoch {}/{}\t Time: {:.3f}\t Loss: {:.8f}'
            #             .format(epoch + 1, self.n_epochs, epoch_train_time, loss_epoch / n_batches))
            print('  Epoch {}/{}\t Time: {:.3f}\t Loss: {:.8f}'
                        .format(epoch + 1, self.n_epochs, epoch_train_time, loss_epoch / n_batches))

        pretrain_time = time.time() - start_time
        # logger.info('Pretraining time: %.3f' % pretrain_time)
        # logger.info('Finished pretraining.')

        print('Pretraining time: %.3f' % pretrain_time)
        print('==========Finished pretraining==========')

        return ae_net

    def test(self, dataset: dataset_model, ae_net: BaseNet):
        # logger = logging.getLogger()

        # Set device for network
        ae_net = ae_net.to(self.device)

        # Get test data loader
        _, test_loader = dataset.loaders(batch_size=self.batch_size, num_workers=self.n_jobs_dataloader)

        # Testing
        # logger.info('Testing autoencoder...')
        print('Testing autoencoder...')
        loss_epoch = 0.0
        n_batches = 0
        start_time = time.time()
        idx_label_score = []
        ae_net.eval() #evaluate mode
        with torch.no_grad():
            for data in test_loader:
                inputs, labels, idx = data
                inputs = inputs.to(self.device)
                outputs = ae_net(inputs)
                scores = torch.sum((outputs - inputs) ** 2, dim=tuple(range(1, outputs.dim())))
                loss = torch.mean(scores)

                # Save triple of (idx, label, score) in a list
                idx_label_score += list(zip(idx.cpu().data.numpy().tolist(),
                                            labels.cpu().data.numpy().tolist(),
                                            scores.cpu().data.numpy().tolist()))

                loss_epoch += loss.item()
                n_batches += 1

        # logger.info('Test set Loss: {:.8f}'.format(loss_epoch / n_batches))
        print('Test set Loss: {:.8f}'.format(loss_epoch / n_batches))

        _, labels, scores = zip(*idx_label_score)
        labels = np.array(labels)
        scores = np.array(scores)

        auc = roc_auc_score(labels, scores)
        # logger.info('Test set AUC: {:.2f}%'.format(100. * auc))
        
        print('Test set AUC: {:.2f}%'.format(100. * auc))

        test_time = time.time() - start_time
        # logger.info('Autoencoder testing time: %.3f' % test_time)
        # logger.info('Finished testing autoencoder.')

        print('Autoencoder testing time: %.3f' % test_time)
        print('Finished testing autoencoder.')
    

### 4-3) pretrain model과 deepsvdd를 합친 DeepSVDD class 정의

In [14]:
import json

class DeepSVDD(object):

    def __init__(self, objective: str = 'one-class', nu: float = 0.1):
        """Inits DeepSVDD with one of the two objectives and hyperparameter nu."""

        assert objective in ('one-class', 'soft-boundary'), "Objective must be either 'one-class' or 'soft-boundary'."
        self.objective = objective
        assert (0 < nu) & (nu <= 1), "For hyperparameter nu, it must hold: 0 < nu <= 1."
        self.nu = nu
        self.R = 0.0  # hypersphere radius R
        self.c = None  # hypersphere center c

        self.net_name = None
        self.net = None  # neural network \phi

        self.trainer = None
        self.optimizer_name = None

        self.ae_net = None  # autoencoder network for pretraining
        self.ae_trainer = None
        self.ae_optimizer_name = None

        self.results = {
            'train_time': None,
            'test_auc': None,
            'test_time': None,
            'test_scores': None,
        }

    def set_network(self, net_name):
        """Builds the neural network \phi."""
        self.net_name = net_name
        self.net = build_network(net_name)

    def train(self, dataset: dataset_model, optimizer_name: str = 'adam', lr: float = 0.001, n_epochs: int = 100,
              lr_milestones: tuple = (), batch_size: int = 128, weight_decay: float = 1e-6, device: str = 'cuda',
              n_jobs_dataloader: int = 0):
        """Trains the Deep SVDD model on the training data."""

        self.optimizer_name = optimizer_name
        self.trainer = DeepSVDDTrainer(self.objective, self.R, self.c, self.nu, optimizer_name, lr=lr,
                                       n_epochs=n_epochs, lr_milestones=lr_milestones, batch_size=batch_size,
                                       weight_decay=weight_decay, device=device, n_jobs_dataloader=n_jobs_dataloader)
        # Get the model
        self.net = self.trainer.train(dataset, self.net)
        self.R = float(self.trainer.R.cpu().data.numpy())  # get float
        self.c = self.trainer.c.cpu().data.numpy().tolist()  # get list
        self.results['train_time'] = self.trainer.train_time

    def test(self, dataset: dataset_model, device: str = 'cuda', n_jobs_dataloader: int = 0):
        """Tests the Deep SVDD model on the test data."""

        if self.trainer is None:
            self.trainer = DeepSVDDTrainer(self.objective, self.R, self.c, self.nu,
                                           device=device, n_jobs_dataloader=n_jobs_dataloader)

        self.trainer.test(dataset, self.net)
        # Get results
        self.results['test_auc'] = self.trainer.test_auc
        self.results['test_time'] = self.trainer.test_time
        self.results['test_scores'] = self.trainer.test_scores

    def pretrain(self, dataset: dataset_model, optimizer_name: str = 'adam', lr: float = 0.001, n_epochs: int = 150,
                 lr_milestones: tuple = (), batch_size: int = 128, weight_decay: float = 1e-6, device: str = 'cuda',
                 n_jobs_dataloader: int = 0):
        """Pretrains the weights for the Deep SVDD network \phi via autoencoder."""

        self.ae_net = build_autoencoder(self.net_name)
        self.ae_optimizer_name = optimizer_name
        self.ae_trainer = AETrainer(optimizer_name, lr=lr, n_epochs=n_epochs, lr_milestones=lr_milestones,
                                    batch_size=batch_size, weight_decay=weight_decay, device=device,
                                    n_jobs_dataloader=n_jobs_dataloader)
        self.ae_net = self.ae_trainer.train(dataset, self.ae_net)
        self.ae_trainer.test(dataset, self.ae_net)
        self.init_network_weights_from_pretraining()

    def init_network_weights_from_pretraining(self):
        """Initialize the Deep SVDD network weights from the encoder weights of the pretraining autoencoder."""

        net_dict = self.net.state_dict()
        ae_net_dict = self.ae_net.state_dict()

        # Filter out decoder network keys
        ae_net_dict = {k: v for k, v in ae_net_dict.items() if k in net_dict}
        # Overwrite values in the existing state_dict
        net_dict.update(ae_net_dict)
        # Load the new state_dict
        self.net.load_state_dict(net_dict)

    def save_model(self, export_model, save_ae=True):
        """Save Deep SVDD model to export_model."""

        net_dict = self.net.state_dict()
        ae_net_dict = self.ae_net.state_dict() if save_ae else None

        torch.save({'R': self.R,
                    'c': self.c,
                    'net_dict': net_dict,
                    'ae_net_dict': ae_net_dict}, export_model)

    def load_model(self, model_path, load_ae=False):
        """Load Deep SVDD model from model_path."""

        model_dict = torch.load(model_path)

        self.R = model_dict['R']
        self.c = model_dict['c']
        self.net.load_state_dict(model_dict['net_dict'])
        if load_ae:
            if self.ae_net is None:
                self.ae_net = build_autoencoder(self.net_name)
            self.ae_net.load_state_dict(model_dict['ae_net_dict'])

    def save_results(self, export_json):
        """Save results dict to a JSON-file."""
        with open(export_json, 'w') as fp:
            json.dump(self.results, fp)

### 4-4) code 실행

In [15]:
import json
import random

def load_config(import_json, settings):
    """Load settings dict from import_json (path/filename.json) JSON-file."""
    with open(import_json, 'r') as fp:
        loaded_settings = json.load(fp)
    settings.update(loaded_settings)

def save_config(export_json, settings):
    """Save settings dict to export_json (path/filename.json) JSON-file."""
    with open(export_json, 'w') as fp:
        json.dump(settings, fp, indent=0)

def main(dataset_name, net_name, xp_path, data_path, load_config, load_model, objective, nu, device, seed,
         optimizer_name, lr, n_epochs, lr_milestone, batch_size, weight_decay, pretrain, ae_optimizer_name, ae_lr,
         ae_n_epochs, ae_lr_milestone, ae_batch_size, ae_weight_decay, n_jobs_dataloader, normal_class):

    settings = {
        'dataset_name': dataset_name,
        'net_name': net_name,
        'xp_path': xp_path,
        'data_path': data_path,
        'load_config': load_config,
        'load_model': load_model,
        'objective': objective,
        'nu': nu,
        'device': device,
        'seed': seed,
        'optimizer_name': optimizer_name,
        'lr': lr,
        'n_epochs': n_epochs,
        'lr_milestone': lr_milestone,
        'batch_size': batch_size,
        'weight_decay': weight_decay,
        'pretrain': pretrain,
        'ae_optimizer_name': ae_optimizer_name,
        'ae_lr': ae_lr,
        'ae_n_epochs': ae_n_epochs,
        'ae_lr_milestone': ae_lr_milestone,
        'ae_batch_size': ae_batch_size,
        'ae_weight_decay': ae_weight_decay,
        'n_jobs_dataloader': n_jobs_dataloader,
        'normal_class': normal_class
    }

    print('Data path is %s.' % data_path)
    print('Export path is %s.' % xp_path)

    print('Dataset: %s' % dataset_name)
    print('Normal class: %d' % normal_class)
    print('Network: %s' % net_name)

    if load_config:
        load_config(import_json=load_config, settings=settings)
        print('Loaded configuration from %s.' % load_config)

    print('Deep SVDD objective: %s' % settings['objective'])
    print('Nu-paramerter: %.2f' % settings['nu'])

    if settings['seed'] != -1:
        random.seed(settings['seed'])
        np.random.seed(settings['seed'])
        torch.manual_seed(settings['seed'])
        print('Set seed to %d.' % settings['seed'])

    if not torch.cuda.is_available():
        device = 'cpu'
    print('Computation device: %s' % device)
    print('Number of dataloader workers: %d' % n_jobs_dataloader)

    # Load data
    dataset = load_dataset(settings['dataset_name'], settings['data_path'], settings['normal_class'])

    # Initialize DeepSVDD model and set neural network \phi
    deep_SVDD = DeepSVDD(settings['objective'], settings['nu'])
    deep_SVDD.set_network(settings['net_name'])

    if settings['load_model']:
        deep_SVDD.load_model(model_path=settings['load_model'], load_ae=True)
        print('Loading model from %s.' % settings['load_model'])

    print('Pretraining: %s' % settings['pretrain']) #True일때 실행됨
    if settings['pretrain']:
        print('Pretraining optimizer: %s' % settings['ae_optimizer_name'])
        print('Pretraining learning rate: %g' % settings['ae_lr'])
        print('Pretraining epochs: %d' % settings['ae_n_epochs'])
        print('Pretraining learning rate scheduler milestones: %s' % (settings['ae_lr_milestone'],))
        print('Pretraining batch size: %d' % settings['ae_batch_size'])
        print('Pretraining weight decay: %g' % settings['ae_weight_decay'])

        deep_SVDD.pretrain(dataset,
                           optimizer_name=settings['ae_optimizer_name'],
                           lr=settings['ae_lr'],
                           n_epochs=settings['ae_n_epochs'],
                           lr_milestones=settings['ae_lr_milestone'],
                           batch_size=settings['ae_batch_size'],
                           weight_decay=settings['ae_weight_decay'],
                           device=device,
                           n_jobs_dataloader=n_jobs_dataloader)
        

    print('Training optimizer: %s' % settings['optimizer_name'])
    print('Training learning rate: %g' % settings['lr'])
    print('Training epochs: %d' % settings['n_epochs'])
    print('Training learning rate scheduler milestones: %s' % (settings['lr_milestone'],))
    print('Training batch size: %d' % settings['batch_size'])
    print('Training weight decay: %g' % settings['weight_decay'])

    deep_SVDD.train(dataset,
                    optimizer_name=settings['optimizer_name'],
                    lr=settings['lr'],
                    n_epochs=settings['n_epochs'],
                    lr_milestones=settings['lr_milestone'],
                    batch_size=settings['batch_size'],
                    weight_decay=settings['weight_decay'],
                    device=device,
                    n_jobs_dataloader=n_jobs_dataloader)

    deep_SVDD.test(dataset, device=device, n_jobs_dataloader=n_jobs_dataloader)

    indices, labels, scores = zip(*deep_SVDD.results['test_scores'])
    indices, labels, scores = np.array(indices), np.array(labels), np.array(scores)
    idx_sorted = indices[labels == 0][np.argsort(scores[labels == 0])]


In [16]:
if __name__ == "__main__":
    main(dataset_name='cifar10', net_name='cifar10_LeNet', xp_path='/home/yulim/practice/July_two/deep_path', data_path=datasets, load_config=None, load_model=None, 
         objective='one-class', nu=0.1, device='cuda', seed=-1,optimizer_name='adam', lr=1e-3, n_epochs=100, lr_milestone=[50], batch_size=200, weight_decay=1e-6, 
         pretrain=True, ae_optimizer_name='adam', ae_lr=1e-3,ae_n_epochs=150, ae_lr_milestone=[50], ae_batch_size=200, ae_weight_decay=1e-6, n_jobs_dataloader=0, normal_class=5)


Data path is <module 'torchvision.datasets' from '/home/yulim/anaconda3/envs/conda_yl/lib/python3.12/site-packages/torchvision/datasets/__init__.py'>.
Export path is /home/yulim/practice/July_two/deep_path.
Dataset: cifar10
Normal class: 5
Network: cifar10_LeNet
Deep SVDD objective: one-class
Nu-paramerter: 0.10
Computation device: cuda
Number of dataloader workers: 0
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to <module 'torchvision.datasets' from '/home/yulim/anaconda3/envs/conda_yl/lib/python3.12/site-packages/torchvision/datasets/__init__.py'>/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:16<00:00, 10652000.47it/s]


Extracting <module 'torchvision.datasets' from '/home/yulim/anaconda3/envs/conda_yl/lib/python3.12/site-packages/torchvision/datasets/__init__.py'>/cifar-10-python.tar.gz to <module 'torchvision.datasets' from '/home/yulim/anaconda3/envs/conda_yl/lib/python3.12/site-packages/torchvision/datasets/__init__.py'>
Files already downloaded and verified
Class name: CIFAR10_LeNet
Pretraining: True
Pretraining optimizer: adam
Pretraining learning rate: 0.001
Pretraining epochs: 150
Pretraining learning rate scheduler milestones: [50]
Pretraining batch size: 200
Pretraining weight decay: 1e-06
Class name: CIFAR10_LeNet_Autoencoder
  Epoch 1/150	 Time: 2.168	 Loss: 32.92889698
  Epoch 2/150	 Time: 1.272	 Loss: 13.52946056
  Epoch 3/150	 Time: 1.272	 Loss: 11.40113205
  Epoch 4/150	 Time: 1.272	 Loss: 9.30803047
  Epoch 5/150	 Time: 1.272	 Loss: 8.13268585
  Epoch 6/150	 Time: 1.271	 Loss: 7.36031593
  Epoch 7/150	 Time: 1.273	 Loss: 6.84963139
  Epoch 8/150	 Time: 1.272	 Loss: 6.48252628
  Epoch 