<a href="https://colab.research.google.com/github/mku813/ActiveLearning/blob/main/AL_default_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Packages

In [None]:
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import random
from torch.utils.data import ConcatDataset
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet18

### Data
- 5000개의 train dataset으로 학습하고, 8000개의 test dataset을 10개의 class에서 100개씩 샘플링해서 총 1000개의 데이터를 Active Learining 할 수 있도록 세팅

In [None]:
train_dataset = datasets.STL10('/content/drive/MyDrive/CAU/2023_1/default_AL/train', split='train', download=False, transform=transforms.ToTensor())
n_train_samples = len(train_dataset)
train_indices = list(range(n_train_samples))
random.shuffle(train_indices)
n_init_samples = 1000  # 초기 학습 데이터 샘플 수
init_indices = train_indices[:n_init_samples]
init_subset = Subset(train_dataset, init_indices)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = resnet18(pretrained=True)
num_classes = 10  # STL10 데이터셋 클래스 수
model.fc = nn.Linear(model.fc.in_features, num_classes)
model.to(device)

optimizer = optim.SGD(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

train_loader = DataLoader(init_subset, batch_size=32, shuffle=True)

for epoch in range(10):
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

In [None]:
test_dataset = datasets.STL10('/content/drive/MyDrive/CAU/2023_1/default_AL/test', split='test', download=False, transform=transforms.ToTensor())
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
acc = []

def test(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100.0 * correct / total
    return accuracy

init_acc = test(model, test_loader)
acc.append(init_acc)
print('Initial accuracy:', init_acc)

In [None]:
def uncertainty_sampling(model, loader):
    model.eval()
    scores = []
    with torch.no_grad():
        for inputs, _ in loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            softmax_outputs = torch.nn.functional.softmax(outputs, dim=1)
            entropy = torch.sum(-softmax_outputs * torch.log2(softmax_outputs), dim=1)
            scores.extend(entropy.cpu().numpy())
    return scores

In [None]:
n_iterations = 10  # 반복 횟수
n_new_samples = 1000  # 선택할 새로운 데이터 샘플 수

for iteration in range(n_iterations):
    # 4. 새로운 데이터 선택
    new_indices = list(set(train_indices) - set(init_indices))
    random.shuffle(new_indices)
    new_subset = Subset(train_dataset, new_indices[:n_new_samples])

    # 5. 새로운 데이터 불확실성 계산
    train_subset = ConcatDataset([init_subset, new_subset])
    train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
    scores = uncertainty_sampling(model, train_loader)

    # 6. 불확실성이 높은 새로운 데이터 선택
    selected_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:n_new_samples]
    selected_subset = Subset(train_dataset, [new_indices[i] for i in selected_indices])

    # 7. 선택된 데이터 레이블링 (수동으로)
    # ...

    # 8. 선택된 데이터 추가
    train_subset = ConcatDataset([train_subset, selected_subset])

    # 9. 모델 재학습
    train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
    for epoch in range(10):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    nth_acc = test(model, test_loader)
    acc.append(nth_acc)
    print('accuracy:', nth_acc)

In [None]:
import matplotlib.pyplot as plt
acc = [19.0625, 74.1625, 80.6125, 82.9, 84.275, 85.2, 85.875, 85.9875, 86.0, 86.1875, 86.35] 
plt.plot(acc, marker = 'o')
plt.show()

In [None]:
import os

os.mkdir('/content/drive/MyDrive/CAU/2023_1/res')
os.mkdir('/content/drive/MyDrive/CAU/2023_1/res/models')

In [None]:
torch.save(model, '/content/drive/MyDrive/CAU/2023_1/res/models/00_entropy_noDiversity.model')

In [None]:
# 저장된 모델 불러오기
loaded_model = torch.load('/content/drive/MyDrive/CAU/2023_1/res/models/00_entropy_noDiversity.model')

In [None]:
# 두 모델이 같은지 확인하는 코드
import collections as co

params1 = model.state_dict()
params2 = loaded_model.state_dict()


# OrderedDict의 모든 값을 Tensor로 변환
tensor_dict1 = co.OrderedDict()
for key, value in params1.items():
    tensor_dict1[key] = torch.tensor(value)

tensor_dict2 = co.OrderedDict()
for key, value in params2.items():
    tensor_dict2[key] = torch.tensor(value)

is_equal = all(torch.equal(tensor_dict1[key], tensor_dict2[key]) for key in tensor_dict1.keys())

if is_equal:
    print("두 모델은 동일합니다.")
else:
    print("두 모델은 동일하지 않습니다.")