In [1]:
import math
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torchvision.datasets import CIFAR10, CIFAR100, MNIST
from torchvision.transforms import ToTensor
from torch.autograd import Variable
import numpy as np

In [2]:
class IndexedDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        data, label = self.dataset[idx]
        return data, label, idx  # 데이터, 라벨, 인덱스 반환

In [3]:
# 데이터 로더 함수
def get_dataloaders(dataset_name, noise_type, noise_rate, batch_size=128):
    if dataset_name == "cifar10":
        train_dataset = CIFAR10(
            root="./data",
            train=True,
            download=True,
            transform=ToTensor(),
        )
        test_dataset = CIFAR10(
            root="./data",
            train=False,
            download=True,
            transform=ToTensor(),
        )
        input_channel = 3
        num_classes = 10

    elif dataset_name == "cifar100":
        train_dataset = CIFAR100(
            root="./data",
            train=True,
            download=True,
            transform=ToTensor(),
        )
        test_dataset = CIFAR100(
            root="./data",
            train=False,
            download=True,
            transform=ToTensor(),
        )
        input_channel = 3
        num_classes = 100

    elif dataset_name == "mnist":
        train_dataset = MNIST(
            root="./data",
            train=True,
            download=True,
            transform=ToTensor(),
        )
        test_dataset = MNIST(
            root="./data",
            train=False,
            download=True,
            transform=ToTensor(),
        )
        input_channel = 1
        num_classes = 10

    else:
        raise ValueError("Unsupported dataset. Choose from: cifar10, cifar100, mnist.")

    train_dataset = IndexedDataset(train_dataset)

    train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
    )

    test_loader = torch.utils.data.DataLoader(
        dataset=test_dataset,
        batch_size=batch_size,
        shuffle=False,
    )
    return train_loader, test_loader, input_channel, num_classes

In [4]:
# 데이터셋 선택
dataset_name = "cifar10"  # 'cifar10', 'cifar100', 'mnist' 중 선택
noise_type = "symmetric"  # 노이즈 유형: 'symmetric', 'pairflip'
noise_rate = 0.2          # 노이즈 비율
batch_size = 128

# 데이터 로더 가져오기
train_loader, test_loader, input_channel, num_classes = get_dataloaders(
    dataset_name, noise_type, noise_rate, batch_size
)

# 데이터셋 정보 출력
print(f"Dataset: {dataset_name}")
print(f"Input Channel: {input_channel}, Number of Classes: {num_classes}")

Files already downloaded and verified
Files already downloaded and verified
Dataset: cifar10
Input Channel: 3, Number of Classes: 10


In [5]:
def loss_coteaching(y_1, y_2, t, forget_rate, ind, noise_or_not):
    loss_1 = F.cross_entropy(y_1, t, reduction='none')  # 'reduce' 대신 'reduction' 사용
    loss_1_numpy = loss_1.detach().cpu().numpy()  # GPU 텐서를 NumPy 배열로 변환
    ind_1_sorted = np.argsort(loss_1_numpy)  # NumPy의 argsort 사용
    loss_1_sorted = loss_1[ind_1_sorted]

    loss_2 = F.cross_entropy(y_2, t, reduction='none')
    loss_2_numpy = loss_2.detach().cpu().numpy()
    ind_2_sorted = np.argsort(loss_2_numpy)
    loss_2_sorted = loss_2[ind_2_sorted]

    remember_rate = 1 - forget_rate
    num_remember = int(remember_rate * len(loss_1_sorted))

    pure_ratio_1 = torch.sum(noise_or_not[ind[ind_1_sorted[:num_remember]]].float()).item() / float(num_remember)
    pure_ratio_2 = torch.sum(noise_or_not[ind[ind_2_sorted[:num_remember]]].float()).item() / float(num_remember)

    ind_1_update = ind_1_sorted[:num_remember]
    ind_2_update = ind_2_sorted[:num_remember]

    # 교환
    loss_1_update = F.cross_entropy(y_1[ind_2_update], t[ind_2_update])
    loss_2_update = F.cross_entropy(y_2[ind_1_update], t[ind_1_update])

    return (
        torch.sum(loss_1_update) / num_remember,
        torch.sum(loss_2_update) / num_remember,
        pure_ratio_1,
        pure_ratio_2,
    )

In [6]:
def call_bn(bn, x):
    return bn(x)

class CNN(nn.Module):
    def __init__(self, input_channel=3, n_outputs=10, dropout_rate=0.25, top_bn=False):
        self.dropout_rate = dropout_rate
        self.top_bn = top_bn
        super(CNN, self).__init__()
        self.c1 = nn.Conv2d(input_channel, 128, kernel_size=3, stride=1, padding=1)
        self.c2 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.c3 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.c4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.c5 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.c6 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.c7 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=0)
        self.c8 = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=0)
        self.c9 = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=0)
        self.l_c1 = nn.Linear(128,n_outputs)
        self.bn1 = nn.BatchNorm2d(128)
        self.bn2 = nn.BatchNorm2d(128)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)
        self.bn5 = nn.BatchNorm2d(256)
        self.bn6 = nn.BatchNorm2d(256)
        self.bn7 = nn.BatchNorm2d(512)
        self.bn8 = nn.BatchNorm2d(256)
        self.bn9 = nn.BatchNorm2d(128)
        
    def forward(self, x):
        h = x
        h = self.c1(h)
        h = F.leaky_relu(call_bn(self.bn1, h), negative_slope=0.01)
        h = self.c2(h)
        h = F.leaky_relu(call_bn(self.bn2, h), negative_slope=0.01)
        h = self.c3(h)
        h = F.leaky_relu(call_bn(self.bn3, h), negative_slope=0.01)
        h = F.max_pool2d(h, kernel_size=2, stride=2)
        h = F.dropout2d(h, p=self.dropout_rate)
        
        h = self.c4(h)
        h = F.leaky_relu(call_bn(self.bn4, h), negative_slope=0.01)
        h = self.c5(h)
        h = F.leaky_relu(call_bn(self.bn5, h), negative_slope=0.01)
        h = self.c6(h)
        h = F.leaky_relu(call_bn(self.bn6, h), negative_slope=0.01)
        h = F.max_pool2d(h, kernel_size=2, stride=2)
        h = F.dropout2d(h, p=self.dropout_rate)
        
        h = self.c7(h)
        h = F.leaky_relu(call_bn(self.bn7, h), negative_slope=0.01)
        h = self.c8(h)
        h = F.leaky_relu(call_bn(self.bn8, h), negative_slope=0.01)
        h = self.c9(h)
        h = F.leaky_relu(call_bn(self.bn9, h), negative_slope=0.01)
        h = F.avg_pool2d(h, kernel_size=h.data.shape[2])
        
        h = h.view(h.size(0), h.size(1))
        logit = self.l_c1(h)
        
        if self.top_bn:
            logit = call_bn(self.bn_c1, logit)
            
        return logit

In [7]:
# 학습 및 평가 루프
def train(train_loader, epoch, model1, optimizer1, model2, optimizer2, rate_schedule, noise_or_not):
    model1.train()
    model2.train()
    total_loss1, total_loss2 = 0, 0
    for i, (images, labels, indexes) in enumerate(train_loader):
        images = Variable(images).cuda()
        labels = Variable(labels).cuda()

        logits1 = model1(images)
        logits2 = model2(images)

        loss_1, loss_2, _, _ = loss_coteaching(
            logits1, logits2, labels, rate_schedule[epoch], indexes, noise_or_not
        )

        optimizer1.zero_grad()
        loss_1.backward()
        optimizer1.step()

        optimizer2.zero_grad()
        loss_2.backward()
        optimizer2.step()

        total_loss1 += loss_1.item()
        total_loss2 += loss_2.item()

    return total_loss1 / len(train_loader), total_loss2 / len(train_loader)

# 평가 함수
def evaluate(test_loader, model):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = Variable(images).cuda()
            logits = model(images)
            _, predicted = torch.max(logits, 1)
            total += labels.size(0)
            correct += (predicted == labels.cuda()).sum().item()
    return 100 * correct / total


In [8]:
# 학습 및 평가
def run_training(
    train_loader, test_loader, input_channel, num_classes, n_epochs, forget_rate, rate_schedule
):
    # 모델 및 옵티마이저 정의
    model1 = CNN(input_channel=input_channel, n_outputs=num_classes)
    model2 = CNN(input_channel=input_channel, n_outputs=num_classes)
    model1.cuda()
    model2.cuda()
    optimizer1 = torch.optim.Adam(model1.parameters(), lr=0.001)
    optimizer2 = torch.optim.Adam(model2.parameters(), lr=0.001)

    # 노이즈 확인용 (옵션)
    noise_or_not = torch.ones(len(train_loader.dataset), dtype=torch.bool)  # 예제 노이즈 여부

    # 학습 루프
    train_losses1, train_losses2 = [], []
    test_accuracies1, test_accuracies2 = [], []

    for epoch in range(n_epochs):
        print(f"Epoch {epoch + 1}/{n_epochs}")
        # 학습
        train_loss1, train_loss2 = train(
            train_loader,
            epoch,
            model1,
            optimizer1,
            model2,
            optimizer2,
            rate_schedule,
            noise_or_not,
        )
        train_losses1.append(train_loss1)
        train_losses2.append(train_loss2)

        # 평가
        test_acc1 = evaluate(test_loader, model1)
        test_acc2 = evaluate(test_loader, model2)
        test_accuracies1.append(test_acc1)
        test_accuracies2.append(test_acc2)

        print(
            f"Train Loss Model1: {train_loss1:.4f}, Model2: {train_loss2:.4f}, "
            f"Test Accuracy Model1: {test_acc1:.2f}%, Model2: {test_acc2:.2f}%"
        )

    return train_losses1, train_losses2, test_accuracies1, test_accuracies2

In [9]:
import matplotlib.pyplot as plt

def plot_results(train_losses1, train_losses2, test_accuracies1, test_accuracies2):
    epochs = len(train_losses1)

    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(epochs), train_losses1, label="Model 1 Loss")
    plt.plot(range(epochs), train_losses2, label="Model 2 Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Training Loss")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(range(epochs), test_accuracies1, label="Model 1 Accuracy")
    plt.plot(range(epochs), test_accuracies2, label="Model 2 Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy (%)")
    plt.title("Test Accuracy")
    plt.legend()

    plt.show()

In [None]:
# 설정
n_epochs = 20  # 학습 epoch 수
forget_rate = 0.2  # Forget Rate
rate_schedule = np.ones(n_epochs) * forget_rate
rate_schedule[:10] = np.linspace(0, forget_rate ** 1, 10)

# 학습 실행
train_losses1, train_losses2, test_accuracies1, test_accuracies2 = run_training(
    train_loader, test_loader, input_channel, num_classes, n_epochs, forget_rate, rate_schedule
)

# 결과 시각화
plot_results(train_losses1, train_losses2, test_accuracies1, test_accuracies2)

Epoch 1/20
Train Loss Model1: 0.0105, Model2: 0.0105, Test Accuracy Model1: 58.30%, Model2: 61.12%
Epoch 2/20
Train Loss Model1: 0.0066, Model2: 0.0065, Test Accuracy Model1: 71.39%, Model2: 71.87%
Epoch 3/20
Train Loss Model1: 0.0047, Model2: 0.0048, Test Accuracy Model1: 75.76%, Model2: 75.66%
Epoch 4/20
Train Loss Model1: 0.0036, Model2: 0.0036, Test Accuracy Model1: 77.26%, Model2: 77.52%
Epoch 5/20
Train Loss Model1: 0.0028, Model2: 0.0028, Test Accuracy Model1: 77.35%, Model2: 79.33%
Epoch 6/20
Train Loss Model1: 0.0021, Model2: 0.0021, Test Accuracy Model1: 80.55%, Model2: 80.59%
Epoch 7/20
Train Loss Model1: 0.0017, Model2: 0.0017, Test Accuracy Model1: 81.17%, Model2: 81.65%
Epoch 8/20
Train Loss Model1: 0.0014, Model2: 0.0014, Test Accuracy Model1: 81.01%, Model2: 81.53%
Epoch 9/20
Train Loss Model1: 0.0011, Model2: 0.0011, Test Accuracy Model1: 81.06%, Model2: 82.40%
Epoch 10/20
Train Loss Model1: 0.0009, Model2: 0.0009, Test Accuracy Model1: 82.48%, Model2: 83.15%
Epoch 11/