# 1: 라이브러리 임포트

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
!pip install torchsummary
from torchsummary import summary

Collecting torchsummary
  Downloading torchsummary-1.5.1-py3-none-any.whl.metadata (296 bytes)
Downloading torchsummary-1.5.1-py3-none-any.whl (2.8 kB)
Installing collected packages: torchsummary
Successfully installed torchsummary-1.5.1


# 2: 커스텀 Dataset 정의

In [2]:

# - 이미지와 레이블을 전달받아 학습 및 검증을 위한 데이터를 제공
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label


# 3: Squeeze-and-Excitation 모듈 정의

In [3]:
# SE Block 정의
class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, reduction=4):
        super(SqueezeExcitation, self).__init__()
        self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1)
        self.fc2 = nn.Conv2d(in_channels // reduction, in_channels, kernel_size=1)

    def forward(self, x):
        out = F.adaptive_avg_pool2d(x, 1)
        out = F.relu(self.fc1(out))
        out = torch.sigmoid(self.fc2(out))
        return x * out

# 4: Conv-BN-ReLU 모듈 정의

In [4]:

# - 일반적인 Convolution-BatchNorm-활성화 함수 블록
class ConvBNReLU(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(ConvBNReLU, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.relu(self.bn(self.conv(x)))


# 5: Inverted Residual Block 정의

In [5]:

class InvertedResidual(nn.Module):
    def __init__(self, in_channels, out_channels, stride, expand_ratio=4):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        hidden_dim = in_channels * expand_ratio
        self.use_res_connect = stride == 1 and in_channels == out_channels

        self.conv1 = nn.Conv2d(in_channels, hidden_dim, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(hidden_dim)
        self.conv2 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=stride, padding=1, groups=hidden_dim, bias=False)
        self.bn2 = nn.BatchNorm2d(hidden_dim)
        self.se = SqueezeExcitation(hidden_dim)  # SE Block 추가
        self.conv3 = nn.Conv2d(hidden_dim, out_channels, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)), inplace=True)
        out = F.relu(self.bn2(self.conv2(out)), inplace=True)
        out = self.se(out)  # SE Block 적용
        out = self.bn3(self.conv3(out))
        if self.use_res_connect:
            return x + out
        else:
            return out

# Squeeze-and-Excitation block

In [6]:
class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, reduction_ratio=32):  # reduction_ratio 증가
        super(SqueezeExcitation, self).__init__()
        reduced_channels = in_channels // reduction_ratio
        self.fc1 = nn.Linear(in_channels, reduced_channels)
        self.fc2 = nn.Linear(reduced_channels, in_channels)

    def forward(self, x):
        batch, channels, _, _ = x.size()
        y = F.adaptive_avg_pool2d(x, 1).view(batch, channels)
        y = F.relu(self.fc1(y))
        y = torch.sigmoid(self.fc2(y)).view(batch, channels, 1, 1)
        return x * y

# 6: NOLA 모델 정의

In [7]:
class NOLA_MobileNetV3(nn.Module):
    def __init__(self, num_classes=100):
        super(NOLA_MobileNetV3, self).__init__()

        # 기본 Conv 레이어
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        
        # Inverted Residual Blocks와 SE Block 사용
        self.block1 = InvertedResidual(16, 24, stride=2)
        self.block2 = InvertedResidual(24, 32, stride=2)
        self.block3 = InvertedResidual(32, 64, stride=2)
        self.block4 = InvertedResidual(64, 128, stride=2)

        # GAP 적용
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)), inplace=True)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.gap(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# NOLA LinearLayer


In [8]:
class GenerateParams(torch.autograd.Function):
    # Generate parameters on the fly with random basis
    
    @staticmethod    
    def forward(ctx, coefficients, out_dim, in_dim, seed): 
        num_basis = coefficients.shape[0]
        Out = torch.zeros(out_dim, in_dim).to(coefficients.device)
        rand_seed = torch.randint(int(1e10), (1,))
        torch.manual_seed(seed)
        
        W = torch.zeros(num_basis, out_dim, in_dim, 
                        device=coefficients.device, dtype=coefficients.dtype)
        nn.init.uniform_(W, a=-1.0, b=1.0)
        Out = torch.einsum('b,boi->oi', coefficients, W)
        
        params = torch.autograd.Variable(torch.tensor([out_dim, in_dim, seed]))
        ctx.save_for_backward(coefficients, params)
        torch.manual_seed(rand_seed)
        return Out 
    
    @staticmethod
    def backward(ctx, grad_output):
        coefficients, params = ctx.saved_tensors
        num_basis = coefficients.shape[0]

        out_dim, in_dim, seed = params
        rand_seed = torch.randint(int(1e10), (1,))
        torch.manual_seed(seed)
        grad_coefficients = torch.empty(0).to(grad_output.device)

        W = torch.zeros(num_basis, out_dim, in_dim, 
                        device=coefficients.device, dtype=coefficients.dtype)
        nn.init.uniform_(W, a=-1.0, b=1.0) 
        W = W.permute(1, 2, 0).reshape(-1, num_basis)
        grad_coefficients = torch.einsum('d,dl->l', grad_output.flatten(), W)

        torch.manual_seed(rand_seed)    
        return grad_coefficients, None, None, None


# NOLA Linear Layer 정의 수정
class NOLALinear(nn.Linear): 
    def __init__(self, in_features: int, out_features: int, coefficients: torch.Tensor, seed: int, num_basis=128, **kwargs):
        super(NOLALinear, self).__init__(in_features, out_features, **kwargs)
        self.num_basis = num_basis 
        self.generate_params = GenerateParams.apply
        self.coefficients = nn.Parameter(coefficients, requires_grad=True)
        self.seed = nn.Parameter(torch.tensor(seed), requires_grad=False)
        self.weight.requires_grad = False
        
    def extra_repr(self) -> str:
        return 'in_features={}, out_features={}, num_basis={}'.format(
            self.in_features, self.out_features, self.num_basis)  

    def forward(self, x: torch.Tensor):
        W = self.generate_params(self.coefficients,
                          self.out_features,
                          self.in_features,
                          self.seed) + self.weight
        return x @ W.t()


# 7: 데이터 전처리

In [9]:

# - 여러 Augmentation 기법 적용 (좌우반전, 회전, 크기 조절 등)
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.Resize((224, 224)),
    transforms.RandomCrop(224, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 이미지 정규화
])


# 8: 데이터 로딩

In [10]:

# - 학습 및 검증 데이터 로드 (Kaggle에서 데이터 불러오기)
train = np.load('/kaggle/input/2024-ai-challenge/trainset.npy')
label = np.load('/kaggle/input/2024-ai-challenge/trainlabel.npy')

# 학습 및 검증 데이터 분할
X_train, X_val, y_train, y_val = train_test_split(train, label, test_size=0.2, random_state=42)
train_dataset = CustomDataset(X_train, y_train, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = CustomDataset(X_val, y_val, transform=transform)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


# 9: 모델 초기화

In [11]:

# - MobileNetV3 모델을 초기화하고 구조를 요약
model = NOLA_MobileNetV3(num_classes=100).cuda()  # 클래스 수 설정
summary(model, (3, 224, 224))  # 모델 구조 요약


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 16, 112, 112]             432
       BatchNorm2d-2         [-1, 16, 112, 112]              32
            Conv2d-3         [-1, 64, 112, 112]           1,024
       BatchNorm2d-4         [-1, 64, 112, 112]             128
            Conv2d-5           [-1, 64, 56, 56]             576
       BatchNorm2d-6           [-1, 64, 56, 56]             128
            Linear-7                    [-1, 2]             130
            Linear-8                   [-1, 64]             192
 SqueezeExcitation-9           [-1, 64, 56, 56]               0
           Conv2d-10           [-1, 24, 56, 56]           1,536
      BatchNorm2d-11           [-1, 24, 56, 56]              48
 InvertedResidual-12           [-1, 24, 56, 56]               0
           Conv2d-13           [-1, 96, 56, 56]           2,304
      BatchNorm2d-14           [-1, 96,

# 10: 학습 준비

In [12]:

# - 학습 및 검증을 위한 설정을 준비
num_epochs = 1000  # 최대 Epoch 수
learning_rate = 0.001  # 초기 학습률
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

# Early Stopping 기준
early_stopping_patience = 10
best_val_accuracy = 0
early_stopping_counter = 0




# 11: 학습 및 검증 루프

In [13]:

# - 모델 학습 및 검증을 수행하는 메인 루프
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct = 0

    # 학습 루프
    for batch_features, batch_labels in train_loader:
        batch_features, batch_labels = batch_features.cuda(), batch_labels.cuda()
        optimizer.zero_grad()
        outputs = model(batch_features)
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == batch_labels).sum().item()

    # 학습 결과 출력
    average_loss = total_loss / len(train_loader)
    train_accuracy = correct / len(train_dataset)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {average_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # 검증 루프
    model.eval()
    val_correct = 0
    with torch.no_grad():
        for val_features, val_labels in val_loader:
            val_features, val_labels = val_features.cuda(), val_labels.cuda()
            val_outputs = model(val_features)
            _, val_predicted = torch.max(val_outputs, 1)
            val_correct += (val_predicted == val_labels).sum().item()

    # 검증 정확도 출력
    val_accuracy = val_correct / len(val_dataset)
    print(f'Validation Accuracy: {val_accuracy:.4f}')

    # 스케줄러를 통해 학습률 조정
    scheduler.step(val_accuracy)

    # Early Stopping 체크
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        early_stopping_counter = 0
        # 모델 저장
        torch.save(model.state_dict(), 'best_model.pth')
        print(f"New best model saved at epoch {epoch + 1} with val accuracy: {val_accuracy:.4f}")
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break


Epoch [1/1000], Loss: 3.8090, Train Accuracy: 0.1128
Validation Accuracy: 0.1809
New best model saved at epoch 1 with val accuracy: 0.1809
Epoch [2/1000], Loss: 3.2143, Train Accuracy: 0.2104
Validation Accuracy: 0.2512
New best model saved at epoch 2 with val accuracy: 0.2512
Epoch [3/1000], Loss: 2.9252, Train Accuracy: 0.2676
Validation Accuracy: 0.2913
New best model saved at epoch 3 with val accuracy: 0.2913
Epoch [4/1000], Loss: 2.7421, Train Accuracy: 0.3028
Validation Accuracy: 0.3196
New best model saved at epoch 4 with val accuracy: 0.3196
Epoch [5/1000], Loss: 2.6099, Train Accuracy: 0.3310
Validation Accuracy: 0.3413
New best model saved at epoch 5 with val accuracy: 0.3413
Epoch [6/1000], Loss: 2.5080, Train Accuracy: 0.3527
Validation Accuracy: 0.3567
New best model saved at epoch 6 with val accuracy: 0.3567
Epoch [7/1000], Loss: 2.4181, Train Accuracy: 0.3718
Validation Accuracy: 0.3737
New best model saved at epoch 7 with val accuracy: 0.3737
Epoch [8/1000], Loss: 2.347

# 12: 테스트 데이터 예측 수행

In [14]:

# - 가장 성능이 좋은 모델을 로드하고 테스트 데이터에 대한 예측을 수행
model.load_state_dict(torch.load('best_model.pth'))  # 가장 성능이 좋은 모델 로드
model.eval()
predictions = []

with torch.no_grad():
    test = np.load('/kaggle/input/2024-ai-challenge/testset.npy')  # 테스트 데이터 불러오기
    test_dataset = CustomDataset(test, np.zeros((test.shape[0], 100)), transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    for test_features, _ in test_loader:
        test_features = test_features.cuda()
        outputs = model(test_features)
        predictions.append(outputs.cpu().numpy())

# 예측 결과 합치기
predictions = np.concatenate(predictions, axis=0)

# 예측된 클래스의 인덱스를 얻음
predicted_classes = np.argmax(predictions, axis=1)

# 결과를 DataFrame으로 변환
submission_df = pd.DataFrame({
    'id_idx': np.arange(len(predicted_classes)),
    'label': predicted_classes
})

# CSV 파일로 저장
submission_filename = 'submission.csv'
submission_df.to_csv(submission_filename, index=False)

print(f"Submission file has been saved as '{submission_filename}'")


  model.load_state_dict(torch.load('best_model.pth'))  # 가장 성능이 좋은 모델 로드


Submission file has been saved as 'submission.csv'
