In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Subset, random_split
import numpy as np

# CIFAR-10 데이터셋 불러오기와 전처리
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet-50에 맞는 크기로 리사이즈
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

cifar10_train = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
cifar10_test = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# 레이블이 있는 데이터와 레이블이 없는 데이터를 분리
num_labeled = 5000  # 레이블이 있는 데이터의 수 (조정 가능)
num_unlabeled = len(cifar10_train) - num_labeled

labeled_data, unlabeled_data = random_split(cifar10_train, [num_labeled, num_unlabeled])

labeled_loader = DataLoader(labeled_data, batch_size=64, shuffle=True)
unlabeled_loader = DataLoader(unlabeled_data, batch_size=64, shuffle=True)
test_loader = DataLoader(cifar10_test, batch_size=64, shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:13<00:00, 13037681.02it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


In [None]:
from torchvision.models import resnet50

# ResNet-50 모델 정의
class ResNet50(nn.Module):
    def __init__(self, num_classes):
        super(ResNet50, self).__init__()
        self.resnet = resnet50(pretrained=True)
        num_ftrs = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(num_ftrs, num_classes)

    def forward(self, x):
        return self.resnet(x)

# 모델 생성
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_classes = 10  # CIFAR-10은 10개의 클래스를 가짐
model = ResNet50(num_classes).to(device)

# 손실 함수와 최적화 알고리즘 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 149MB/s]


In [None]:
def test(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')



In [None]:
def train_semisupervised(model, labeled_loader, unlabeled_loader, criterion, optimizer, device, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        labeled_iter = iter(labeled_loader)
        unlabeled_iter = iter(unlabeled_loader)

        for i, (labeled_images, labeled_labels) in enumerate(labeled_iter):
            try:
                unlabeled_images, _ = next(unlabeled_iter)
            except StopIteration:
                unlabeled_iter = iter(unlabeled_loader)
                unlabeled_images, _ = next(unlabeled_iter)

            labeled_images, labeled_labels = labeled_images.to(device), labeled_labels.to(device)
            unlabeled_images = unlabeled_images.to(device)

            optimizer.zero_grad()

            # 레이블이 있는 데이터로 손실 계산
            labeled_outputs = model(labeled_images)
            labeled_loss = criterion(labeled_outputs, labeled_labels)

            # 레이블이 없는 데이터로 손실 계산 (랜덤으로 레이블 예측)
            unlabeled_outputs = model(unlabeled_images)
            pseudo_labels = unlabeled_outputs.detach().argmax(dim=1)
            unlabeled_loss = criterion(unlabeled_outputs, pseudo_labels)

            loss = 0.1*labeled_loss + 0.9*unlabeled_loss
            loss.backward()
            optimizer.step()



            if i % 20 == 0:
              if i%len(labeled_loader)==0:
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(labeled_loader)}], Loss: {loss.item():.4f}')
              else:
                print(f'Step [{i + 1}/{len(labeled_loader)}], Loss: {loss.item():.4f}')
        # 테스트 실행
        test(model, test_loader, device)

In [None]:
num_epochs = 30  # 예시로 10 에폭으로 설정 (조정 가능)
train_semisupervised(model, labeled_loader, unlabeled_loader, criterion, optimizer, device, num_epochs=num_epochs)

Epoch [1/30], Step [1/79], Loss: 2.0377
Step [21/79], Loss: 0.5389
Step [41/79], Loss: 0.5554
Step [61/79], Loss: 0.4791
Test Accuracy: 10.00%
Epoch [2/30], Step [1/79], Loss: 0.4657
Step [21/79], Loss: 0.5089
Step [41/79], Loss: 0.4311
Step [61/79], Loss: 0.5159
Test Accuracy: 9.15%
Epoch [3/30], Step [1/79], Loss: 0.4897
Step [21/79], Loss: 0.5567
Step [41/79], Loss: 0.4758
Step [61/79], Loss: 0.5321
Test Accuracy: 10.00%
Epoch [4/30], Step [1/79], Loss: 0.4342
Step [21/79], Loss: 0.4089
Step [41/79], Loss: 0.4114
Step [61/79], Loss: 0.5219
Test Accuracy: 10.00%
Epoch [5/30], Step [1/79], Loss: 0.4671
Step [21/79], Loss: 0.4804
Step [41/79], Loss: 0.4964
Step [61/79], Loss: 0.4742
Test Accuracy: 10.00%
Epoch [6/30], Step [1/79], Loss: 0.4822
Step [21/79], Loss: 0.4737
Step [41/79], Loss: 0.4747
Step [61/79], Loss: 0.4465
Test Accuracy: 10.00%
Epoch [7/30], Step [1/79], Loss: 0.4473
Step [21/79], Loss: 0.4887
Step [41/79], Loss: 0.4535
Step [61/79], Loss: 0.4219
Test Accuracy: 10.00%
