<a href="https://colab.research.google.com/github/kjjwwo/cv_study/blob/main/c2_t1_a1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import warnings
warnings.filterwarnings("ignore")

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, models, transforms

from torchsummary import summary

import os
import copy
import time
import numpy as np
import matplotlib.pyplot as plt

print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

## Prepair Dataset

In [None]:
!git clone https://github.com/hbcbh1999/recaptcha-dataset.git

fatal: destination path 'recaptcha-dataset' already exists and is not an empty directory.


In [None]:
!rm -rf ./recaptcha-dataset/Large/Mountain/
!rm -rf ./recaptcha-dataset/Large/Other/
!rm -rf ./recaptcha-dataset/Large/readme.txt

In [None]:
data_dir = "./recaptcha-dataset/Large"
class_names = ['Bicycle', 'Bridge', 'Bus', 'Car',
               'Chimney', 'Crosswalk', 'Hydrant',
               'Motorcycle', 'Palm', 'Traffic Light']

input_size = 224 # input 224x224, 3 channel
batch_size = 32

# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = transforms.Compose([
        transforms.ToTensor(),
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        #transforms.RandomRotation(degrees=30), # 30도 이내 랜덤 회전 (추가)
        #transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)), # 위치 랜덤 이동 (추가)
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

# val dataset은 transform 최소화 (적용은 아직 안함!!!)
val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

print("Initializing Datasets and Dataloaders...")

# 전체 image dataset 및 indices
image_datasets = datasets.ImageFolder(data_dir, data_transforms)  # your dataset
num_data = len(image_datasets)
indices = np.arange(num_data)
np.random.shuffle(indices)

Initializing Datasets and Dataloaders...


In [None]:
# Training 다 끝나고 feature vector(DB image index) 추출할 때 사용
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False # parameter freezing

In [None]:
# torchvision.models에서 ResNet 불러올 때 사용 (pretrained로 불러올 예정)
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        #model_ft.fc = nn.Linear(num_ftrs, num_classes)
        model_ft.fc = nn.Sequential(  # FC Layer 수정 (추가)
            nn.Linear(num_ftrs, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256,128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

In [None]:
# Models to choose from [resnet, alexnet, vgg, squeezenet, densenet, inception]
model_name = "resnet"

num_classes = 10
num_epochs = 50

# Flag for feature extracting. When False, we finetune the whole model,
#   when True we only update the reshaped layer params
feature_extract = True

## Train model

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, patience=10):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    epoch_no_improve = 0 # batch size 줄이고 epoch 늘려서, early stopping 진행 (추가)

    model = model.to(device)
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        val_epoch_acc = 0.0

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val':
              val_acc_history.append(epoch_acc)
              if epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
              else:
                epoch_no_improve += 1

        if epoch_no_improve >= patience:
          print(f"\nEarly stopping at epoch {epoch+1}")
          break


        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [None]:
# CUDA 사용
torch.cuda.is_available()

True

In [None]:
# Detect if we have a GPU available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
print(device)

cuda:0


## 3 Fold Cross Validation

In [None]:
# K Fold Cross validation
from sklearn.model_selection import KFold

num_splits = 4 # 나누는 건 5개, 3번에서 종료
kf = KFold(n_splits=num_splits, shuffle=True, random_state=42)

all_indices = np.arange(num_data)
results = []
model_filenames = []

# 3 fold 학습 반복
for fold, (train_idx, val_idx) in enumerate(kf.split(all_indices)):
  print(f"\nFold {fold+1}/{num_splits}")

  # DataLoader 생성
  train_set = torch.utils.data.Subset(image_datasets, train_idx)
  val_set = torch.utils.data.Subset(image_datasets, val_idx)

  print('Number of training data:', len(train_set))
  print('Number of validation data:', len(val_set))

  dataloaders = {'train': torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4),
                 'val': torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=4)}

  # Fold별 모델 새로 초기화
  model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)
  model_ft = model_ft.to(device)
  # Re-initialize the optimizer for each fold with the new model's parameters
  # Ensure that params_to_update is correctly determined for the new model instance
  params_to_update = model_ft.parameters()
  if feature_extract:
      params_to_update = []
      for name,param in model_ft.named_parameters():
          if param.requires_grad == True:
              params_to_update.append(param)

  optimizer_ft = optim.SGD(params_to_update, lr=0.00069, momentum=0.9)
  criterion = nn.CrossEntropyLoss()

  # train_model 함수 호출
  best_model, best_acc = train_model(model_ft, dataloaders, criterion, optimizer_ft, num_epochs=num_epochs)
  results.append(float(best_acc[-1]))

  # best model의 state_dict 저장
  torch.save(best_model.state_dict(), f"model_fold{fold+1}.pth")
  model_filenames.append(f"model_fold{fold+1}.pth")

  if fold >= 1: # fold 2번 돌리고 종료
    break

# best model 선택
best_fold = np.argmax(results)
print(f"Best fold: {best_fold+1}, Val acc: {results[best_fold]:.4f}")

print("\n=== K-Fold Validation Results ===")
print(results)
print(f"Mean Accuracy: {np.mean(results):.4f}")
print(f"Standard Deviation: {np.std(results):.4f}")


Fold 1/4
Number of training data: 7719
Number of validation data: 2573
Epoch 0/49
----------
train Loss: 1.9933 Acc: 0.3362


KeyboardInterrupt: 

In [None]:
# Load best model
best_model_ft,_ = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)
best_model_ft.load_state_dict(torch.load("model_fold1.pth"))
best_model_ft = best_model_ft.to(device)
set_parameter_requires_grad(best_model_ft, True)

In [None]:
modules = list(best_model_ft.children())[:-1]
resnet18_feat = nn.Sequential(*modules)
for p in resnet18_feat.parameters():
    p.requires_grad = False

In [None]:
from tqdm import tqdm

train_features = []
train_labels = []
val_features = []
val_labels = []

for inputs, labels in tqdm(dataloaders['train']):
  inputs = inputs.to(device)
  h = resnet18_feat(inputs)

  # Eliminate unnecessary dimensions
  h = h.view([-1, 512])
  # Move to 'cpu' & change to 'numpy array'
  h = h.detach().cpu().numpy()

  train_features.append(h)

  # labels
  train_labels.append(labels.detach().cpu().numpy())

for inputs, labels in tqdm(dataloaders['val']):
  inputs = inputs.to(device)
  h = resnet18_feat(inputs)

  # Eliminate unnecessary dimensions
  h = h.view([-1, 512])
  # Move to 'cpu' & change to 'numpy array'
  h = h.detach().cpu().numpy()

  val_features.append(h)

  # labels
  val_labels.append(labels.detach().cpu().numpy())

100%|██████████| 242/242 [00:25<00:00,  9.63it/s]
100%|██████████| 81/81 [00:07<00:00, 11.43it/s]


In [None]:
train_features = np.concat(train_features, axis=0)
train_labels = np.concat(train_labels, axis=0)
val_features = np.concat(val_features, axis=0)
val_labels = np.concat(val_labels, axis=0)

print(f"Train Features: {train_features.shape}")
print(f"Train Labels: {train_labels.shape}")
print(f"Validation Features: {val_features.shape}")
print(f"Validation Labels: {val_labels.shape}")

Train Features: (7719, 512)
Train Labels: (7719,)
Validation Features: (2573, 512)
Validation Labels: (2573,)


In [None]:
from PIL import Image
import os

# Query 경로
query_dir = "/content/query"
query_image_list = sorted(os.listdir(query_dir))
query_image_list = [img for img in query_image_list if img.endswith(('.jpg', '.png', '.jpeg'))]

# 모델 입력용 transform
data_transforms = transforms.Compose([
    transforms.Resize((input_size, input_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],  # ImageNet mean
                         [0.229, 0.224, 0.225])   # ImageNet std
])

FileNotFoundError: [Errno 2] No such file or directory: '/content/query'

In [None]:
def extract_deep_feature(model, image_path):
    img = Image.open(image_path).convert("RGB")
    img_tensor = data_transforms(img).unsqueeze(0).to(device)

    with torch.no_grad():
        features = model.forward_features(img_tensor) if hasattr(model, 'forward_features') \
            else model.avgpool(model.layer4(model.layer3(model.layer2(model.layer1(model.relu(model.bn1(model.conv1(img_tensor))))))))
        features = features.view(features.size(0), -1).cpu().numpy()

    return features[0]

In [None]:
query_features = []
for image_name in query_image_list:
    image_path = os.path.join(query_dir, image_name)
    try:
        feat = extract_deep_feature(model_ft, image_path)
        query_features.append(feat)
    except Exception as e:
        print(f"Error processing {image_name}: {e}")

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report

recaptcha = './recaptcha-dataset/Large'
labels = ['Bicycle', 'Bridge', 'Bus', 'Car', 'Chimney',
          'Crosswalk', 'Hydrant', 'Motorcycle', 'Palm', 'Traffic Light']

In [None]:
classifier = KNeighborsClassifier(n_neighbors = 3)
classifier.fit(train_features, train_labels)

In [None]:
predict_labels = classifier.predict(val_features)
predict_label_names = [labels[predict_label] for predict_label in predict_labels]
print(classification_report(val_labels, predict_labels, target_names=labels))

               precision    recall  f1-score   support

      Bicycle       0.74      0.80      0.77       225
       Bridge       0.63      0.76      0.69       130
          Bus       0.84      0.81      0.82       308
          Car       0.80      0.82      0.81       910
      Chimney       0.64      0.50      0.56        18
    Crosswalk       0.85      0.79      0.82       291
      Hydrant       0.96      0.95      0.96       253
   Motorcycle       0.52      0.52      0.52        25
         Palm       0.83      0.77      0.80       234
Traffic Light       0.75      0.67      0.71       179

     accuracy                           0.80      2573
    macro avg       0.76      0.74      0.75      2573
 weighted avg       0.81      0.80      0.81      2573



In [None]:
from sklearn.neighbors import KNeighborsClassifier
import numpy as np

# 학습된 train feature 불러오기
X_train = np.load("/content/knn_train_features.npy")
y_train = np.load("/content/knn_train_labels.npy")

classifier = KNeighborsClassifier(n_neighbors=1)
classifier.fit(X_train, y_train)

# query 예측
query_features_np = np.array(query_features)
predicted_labels = classifier.predict(query_features_np)

# 결과 출력
for fname, pred in zip(query_image_list, predicted_labels):
    print(f"{fname} -> {pred}")

FileNotFoundError: [Errno 2] No such file or directory: '/content/knn_train_features.npy'

In [None]:
neigh_ind = classifier.kneighbors(X=val_features, n_neighbors=10, return_distance=False) # Top-10 results
neigh_labels = np.array(train_labels)[neigh_ind]
print(neigh_labels[:2])

# 숫자를 이름으로  변경
neigh_label_names = [[labels[idx] for idx in topk] for topk in neigh_labels]
print(neigh_label_names[:2])

[[2 2 2 2 2 2 2 2 2 2]
 [3 9 3 9 9 3 9 3 5 3]]
[['Bus', 'Bus', 'Bus', 'Bus', 'Bus', 'Bus', 'Bus', 'Bus', 'Bus', 'Bus'], ['Car', 'Traffic Light', 'Car', 'Traffic Light', 'Traffic Light', 'Car', 'Traffic Light', 'Car', 'Crosswalk', 'Car']]


In [None]:
import csv

with open('c2_t1_a1.csv','w') as file:
  write = csv.writer(file)
  for i, predict_label_name in enumerate(predict_label_names):
    write.writerow([f'query{i+1:03}.png', predict_label_name])

with open('c2_t2_a1.csv', 'w') as file:
  write = csv.writer(file)
  for i, neigh_label_name in enumerate(neigh_label_names):
    write.writerow([f'query{i+1:03}.png', neigh_label_name])