# 2024 데이터 크리에이터 캠프

문제: 인공지능은 사람의 마음을 이해할수 있을까?

### 구글 드라이브 연결

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Mission1. 패션 스타일 이미지 분류

### Mission 1-1.  
주어진 이미지 데이터의 파일 명은 아래와 같은 형식이다.  
“{W/T}_{이미지ID}_{시대별}_{스타일별}_{성별}.jpg”이에 기반하여 “이미지ID” 수 기준으로  
“성별 & 스타일”통계치를아래표형식으로기입한다.  

In [2]:
import os
import pandas as pd

In [3]:
def count_images_by_gender_and_style(file_names):
    rows = []  # 결과를 저장할 리스트

    # 파일명 분석 및 카운트
    for filename in file_names:
        # 파일 확장자 체크
        if not filename.endswith('.jpg'):
            continue

        # .jpg 제거
        no_jpg_filename = filename.split('.')[0]
        parts = no_jpg_filename.split('_')

        # 성별, 스타일, 이미지 ID 추출
        gender = '여성' if parts[-1] == 'W' else '남성'
        style = parts[3]
        image_id = parts[1]

        # 유효한 스타일과 이미지 ID일 때만 추가
        if style is not None and image_id is not None:
            rows.append({'성별': gender, '스타일': style, '이미지 ID': image_id})

    # DataFrame 생성
    df = pd.DataFrame(rows)

    # 중복된 행 제거
    df = df.drop_duplicates(subset=['성별', '스타일', '이미지 ID'])

    # 성별과 스타일별로 이미지 수 집계
    result = df.groupby(['성별', '스타일']).size().reset_index(name='이미지 수')
    result = result.sort_values(by='성별')

    return result

#### train

In [4]:
# traininig_image 폴더
train_folder_path = '/content/drive/MyDrive/kict/dataset/training_image'

# 폴더 내의 파일 목록 가져오기
train_file_list = os.listdir(train_folder_path)
train_file_names = [filename for filename in train_file_list]

In [5]:
# traininig_image 폴더
result_train = count_images_by_gender_and_style(train_file_names)
result_train.to_csv('mission_1-1_train.csv', index=False)

In [6]:
result_train

Unnamed: 0,성별,스타일,이미지 수
0,남성,bold,268
1,남성,hiphop,274
2,남성,hippie,260
3,남성,ivy,237
4,남성,metrosexual,278
5,남성,mods,269
6,남성,normcore,364
7,남성,sportivecasual,298
21,여성,lounge,45
22,여성,military,33


#### validation

In [7]:
# validation_image 폴더
val_folder_path = '/content/drive/MyDrive/kict/dataset/validation_image'

# 폴더 내의 파일 목록 가져오기
val_file_list = os.listdir(val_folder_path)
val_file_names = [filename for filename in val_file_list]

In [8]:
# validation_image 폴더
result_val = count_images_by_gender_and_style(val_file_names)
result_val.to_csv('mission_1-1_val.csv', index=False)

In [9]:
result_val

Unnamed: 0,성별,스타일,이미지 수
0,남성,bold,57
1,남성,hiphop,66
2,남성,hippie,82
3,남성,ivy,79
4,남성,metrosexual,58
5,남성,mods,80
6,남성,normcore,51
7,남성,sportivecasual,52
21,여성,lounge,8
22,여성,military,9


### Mission 1-2.  
ResNet-18를 활용하여 “성별 & 스타일” 단위로 클래스 분류를 수행하고 Validation 데이터에 대한 정확도를 제시한다.   
 - ResNet-18의 parameters는 무작위로 초기화하여 사용한다. (즉, pretrained weights는 사용할 수 없음)  
 - 성능을 높이기 위해 object detection, image cropping 등의 다양한 데이터 전처리 기법을 활용해도 무방하다.    
 (데이터 전처리 단계에 한해서는 외부 라이브러리 활용 가능)   

### 라이브러리 불러오기

In [10]:
import os
import torch
from torch import Tensor
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
from sklearn.metrics import accuracy_score
from typing import Type

### Resnet 구현

In [11]:
class BasicBlock(nn.Module):
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        stride: int = 1,
        expansion: int = 1,
        downsample: nn.Module = None
    ) -> None:
        super(BasicBlock, self).__init__()
        self.expansion = expansion
        self.downsample = downsample
        self.conv1 = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(
            out_channels,
            out_channels*self.expansion,
            kernel_size=3,
            padding=1,
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels*self.expansion)

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return  out

In [12]:
class ResNet(nn.Module):
    def __init__(
        self,
        img_channels: int,
        num_layers: int,
        block: Type[BasicBlock],
        num_classes: int  = 1000
    ) -> None:
        super(ResNet, self).__init__()
        if num_layers == 18: # ResNet18 만을 본 대회에서 사용함으로 18층만 구현
            layers = [2, 2, 2, 2]
            self.expansion = 1

        self.in_channels = 64
        self.conv1 = nn.Conv2d(
            in_channels=img_channels,
            out_channels=self.in_channels,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512*self.expansion, num_classes)

    def _make_layer(
        self,
        block: Type[BasicBlock],
        out_channels: int,
        blocks: int,
        stride: int = 1
    ) -> nn.Sequential:
        downsample = None
        if stride != 1:
            downsample = nn.Sequential(
                nn.Conv2d(
                    self.in_channels,
                    out_channels*self.expansion,
                    kernel_size=1,
                    stride=stride,
                    bias=False
                ),
                nn.BatchNorm2d(out_channels * self.expansion),
            )
        layers = []
        layers.append(
            block(
                self.in_channels, out_channels, stride, self.expansion, downsample
            )
        )
        self.in_channels = out_channels * self.expansion

        for i in range(1, blocks):
            layers.append(block(
                self.in_channels,
                out_channels,
                expansion=self.expansion
            ))
        return nn.Sequential(*layers)

    def forward(self, x: Tensor) -> Tensor:
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # print('Dimensions of the last convolutional feature map: ', x.shape)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

### 데이터 전처리

In [15]:
import numpy as np
import random

def seed_everything(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
seed_everything()

# 이미지 파일이 있는 디렉토리 경로
train_image_directory = '/content/drive/MyDrive/kict/dataset/training_image'   # <--- training_image 폴더 경로를 입력
valid_image_directory = '/content/drive/MyDrive/kict/dataset/validation_image' # <--- validation_image 폴더 경로를 입력

# CustomDataset 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, image_directory, transform=None):
        self.image_directory = image_directory
        self.image_files = [f for f in os.listdir(image_directory) if f.endswith('.jpg')]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        file_name = self.image_files[idx]
        image_path = os.path.join(self.image_directory, file_name)
        image = Image.open(image_path).convert('RGB')

        # 이미지 파일명에서 스타일과 성별 정보 추출
        parts = file_name.split('_')
        style_gender = parts[-2] + '_' + parts[-1].split('.')[0]  # 스타일과 성별 정보 추출

        # 스타일과 성별 정보를 레이블로 변환
        label = style_gender
        label_idx = label_to_index[label]

        if self.transform:
            image = self.transform(image)
        return image, label_idx

# 레이블을 숫자로 매핑하기 위한 딕셔너리 생성
label_set = set()
for file_name in os.listdir(train_image_directory) + os.listdir(valid_image_directory):
    parts = file_name.split('_')
    style_gender = parts[-2] + '_' + parts[-1].split('.')[0]
    label_set.add(style_gender)

label_list = sorted(list(label_set))
label_to_index = {label: idx for idx, label in enumerate(label_list)}
index_to_label = {idx: label for label, idx in label_to_index.items()}

In [16]:
index_to_label

{0: 'athleisure_W',
 1: 'bodyconscious_W',
 2: 'bold_M',
 3: 'cityglam_W',
 4: 'classic_W',
 5: 'disco_W',
 6: 'ecology_W',
 7: 'feminine_W',
 8: 'genderless_W',
 9: 'grunge_W',
 10: 'hiphop_M',
 11: 'hiphop_W',
 12: 'hippie_M',
 13: 'hippie_W',
 14: 'ivy_M',
 15: 'kitsch_W',
 16: 'lingerie_W',
 17: 'lounge_W',
 18: 'metrosexual_M',
 19: 'military_W',
 20: 'minimal_W',
 21: 'mods_M',
 22: 'normcore_M',
 23: 'normcore_W',
 24: 'oriental_W',
 25: 'popart_W',
 26: 'powersuit_W',
 27: 'punk_W',
 28: 'space_W',
 29: 'sportivecasual_M',
 30: 'sportivecasual_W'}

클래스 총 31개의 범주

In [17]:
len(index_to_label)

31

이미지 데이터 정규화를 위한 평균과 표준편차 계산 수행

In [None]:
# # 데이터셋의 평균과 표준편차 계산 함수
# def calculate_mean_std(loader):
#     mean = 0.0
#     std = 0.0
#     total_images_count = 0
#     for images, _ in loader:
#         batch_samples = images.size(0)  # 배치 크기 (이때 마지막 배치는 더 작을 수 있음)
#         images = images.view(batch_samples, images.size(1), -1)
#         mean += images.mean(2).sum(0)
#         std += images.std(2).sum(0)
#         total_images_count += batch_samples

#     mean /= total_images_count
#     std /= total_images_count
#     return mean, std

# # 임시로 ToTensor 변환만 적용하여 데이터 로더 생성
# temp_transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
# ])

# temp_train_dataset = CustomDataset(train_image_directory, transform=temp_transform)
# temp_train_loader = DataLoader(temp_train_dataset, batch_size=32, shuffle=True)

# # 평균과 표준편차 계산
# mean, std = calculate_mean_std(temp_train_loader)
# print(f"Calculated mean: {mean}")
# print(f"Calculated std: {std}")

위의 코드를 통해 이미지 특성 벡터의 평균과 표준편차를 구함  
> Calculated mean: tensor([0.5498, 0.5226, 0.5052])    
> Calculated std: tensor([0.2600, 0.2582, 0.2620])    

데이터 전처리 및 로드

In [18]:
# 데이터 전처리 및 로드
transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomRotation(degrees=15),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2,
                           contrast=0.2,
                           saturation=0.2,
                           hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5498, 0.5226, 0.5052], std=[0.2600, 0.2582, 0.2620]),
])

train_dataset = CustomDataset(train_image_directory, transform=transform)
val_dataset = CustomDataset(valid_image_directory, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

### 모델 구축

모델 학습

In [19]:
# ResNet-18 모델 생성 함수
def resnet18(img_channels: int, num_classes: int) -> ResNet:
    return ResNet(img_channels, 18, BasicBlock, num_classes) # 18은 ResNet-18을 지정

# 모델 인스턴스 생성
num_classes = len(label_list)  # 레이블의 총 개수
model = resnet18(img_channels=3, num_classes=num_classes) # RGB 이미지: 3개의 채널

# 손실 함수 및 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 모델을 GPU로 이동
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 모델 학습
num_epochs = 100
patience = 3  # Early stopping patience
best_val_loss = float('inf')  # Initialize best validation loss as infinity
counter = 0  # Counter to track patience

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        optimizer.zero_grad()
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # 정확도 계산
        _, preds = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()

    train_loss = running_loss / len(train_loader)
    train_accuracy = correct / total * 100

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%')

    # Validation phase
    model.eval()
    all_preds = []
    all_labels = []
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_running_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (preds == labels).sum().item()
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate average validation loss and accuracy
    avg_val_loss = val_running_loss / len(val_loader)
    val_accuracy = val_correct / val_total * 100

    print(f'Epoch [{epoch+1}/{num_epochs}], Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

    # Early stopping logic
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0  # Reset counter if validation loss improves
    else:
        counter += 1
        if counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break


Epoch [1/100], Train Loss: 3.3354, Train Accuracy: 7.54%
Epoch [1/100], Validation Loss: 3.3822, Validation Accuracy: 6.10%
Epoch [2/100], Train Loss: 3.2247, Train Accuracy: 8.65%
Epoch [2/100], Validation Loss: 3.2670, Validation Accuracy: 5.57%


KeyboardInterrupt: 

모델 검증

In [None]:
# 정확도 계산
accuracy = accuracy_score(all_labels, all_preds)
print(f'Validation Accuracy: {accuracy * 100:.2f}%')

In [None]:
# 모델 가중치 저장
torch.save(model.state_dict(), '/content/drive/MyDrive/kict/model/model_weights.pth')

In [None]:
# 모델 가중치 불러오기 및 모델 구축
def load_model(model_class, model_weights_path, *args, **kwargs):
    model = model_class(*args, **kwargs)  # 모델 클래스 인스턴스화
    model.load_state_dict(torch.load(model_weights_path))  # 가중치 불러오기
    return model

# 모델 가중치 불러오기
model = load_model(
    ResNet
    , '/content/drive/MyDrive/kict/model/model_weights.pth'
    , img_channels=3
    , num_layers=18
    , block=BasicBlock
    , num_classes=31
)

### 예측 수행

실제 이미지가 입력으로 들어온다고 가정하에 예측과정 수행

In [None]:
# 예측 결과를 {스타일}_{성별} 형식으로 변환하여 출력
def predict_image(image_path, model, transform):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)  # 배치 차원 추가
    model.eval()
    with torch.no_grad():
        output = model(image)
        _, pred = torch.max(output, 1)
    return index_to_label[pred.item()]

# 이미지 예측
example_image_path = '/content/drive/MyDrive/kict/dataset/example.jpg'  # <--- 예시 이미지 경로를 입력
predicted_label = predict_image(example_image_path, model, transform)
print(f'Predicted: {predicted_label}')

.