In [None]:
!pip install fvcore

In [None]:
import pandas as pd
import torch
from torch.utils.data import TensorDataset, DataLoader, random_split
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import matplotlib.pyplot as plt
from torch.optim.swa_utils import AveragedModel, SWALR, update_bn
import torch.nn.utils.prune as prune
from torch.quantization import quantize_dynamic
from fvcore.nn import FlopCountAnalysis
import time

In [None]:
# 데이터 로드 및 전처리
data_dir = '/kaggle/input/digit-recognizer/'
raw_df = pd.read_csv(data_dir + 'train.csv')
test_df = pd.read_csv(data_dir + 'test.csv')
sub_df = pd.read_csv(data_dir + 'sample_submission.csv')

input_cols = raw_df.columns.tolist()
input_cols.remove('label')
target_col = 'label'

for col in input_cols:
    raw_df[col] = raw_df[col] / 255

In [None]:
# 데이터셋 클래스 정의
class AugmentedDataset(torch.utils.data.Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img, label = self.images[idx], self.labels[idx]
        img = T.ToPILImage()(img)
        if self.transform:
            img = self.transform(img)
        return img, label

In [None]:
# 데이터 변환
train_transforms = T.Compose([
    T.RandomHorizontalFlip(),     
    T.RandomRotation(15),         
    T.RandomCrop(32, padding=4),  
    T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), 
    T.ToTensor(),               
])

val_transforms = T.Compose([
    T.ToTensor(),              
])

input_tensor = torch.tensor(raw_df[input_cols].values, dtype=torch.float32).reshape(-1, 1, 28, 28)
target_tensor = torch.tensor(raw_df[target_col].values)

# 데이터셋 생성
train_ds = AugmentedDataset(input_tensor, target_tensor, transform=train_transforms)
val_ds = AugmentedDataset(input_tensor, target_tensor, transform=val_transforms)

In [None]:
# 훈련 및 검증 데이터 분할
val_size = 8000
train_size = len(train_ds) - val_size
random_seed = 42
torch.manual_seed(random_seed)

train_ds, val_ds = random_split(train_ds, [train_size, val_size])

# DataLoader 생성
batch_size = 128
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_dl = DataLoader(val_ds, batch_size * 2, shuffle=False, num_workers=4, pin_memory=True)

In [None]:
# GPU 사용 설정
def get_default_device():
    return torch.device('cuda' if torch.cuda.is_available() else 'cpu')

device = get_default_device()

def to_device(data, device):
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        for batch in self.dl:
            yield to_device(batch, self.device)

    def __len__(self):
        return len(self.dl)

train_dl = DeviceDataLoader(train_dl, device)
val_dl = DeviceDataLoader(val_dl, device)

In [None]:
# 모델 정의
def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class CNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.prep = conv_block(1, 64)
        self.layer1 = conv_block(64, 128, pool=True)
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128))
        self.layer2 = conv_block(128, 256, pool=True)
        self.layer3 = conv_block(256, 512, pool=True)
        self.res2 = nn.Sequential(conv_block(512, 512), conv_block(512, 512))
        self.classifier = nn.Sequential(
            nn.MaxPool2d(4),
            nn.Flatten(),
            nn.Dropout(0.2),
            nn.Linear(512, 10)
        )

    def forward(self, xb):
        out = self.prep(xb)
        out = self.layer1(out)
        out = self.res1(out) + out
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.res2(out) + out
        out = self.classifier(out)
        return out

In [None]:
# 모델 초기화
cnn_model = to_device(CNNModel(), device)
swa_model = AveragedModel(cnn_model)

# 옵티마이저 및 SWA 설정
base_optimizer = torch.optim.AdamW(cnn_model.parameters(), lr=0.01, weight_decay=1e-4)
swa_scheduler = SWALR(base_optimizer, anneal_strategy="cos", anneal_epochs=5, swa_lr=0.005)

In [None]:
def fit_swa_pruning(epochs, swa_start, model, swa_model, train_dl, val_dl, optimizer, scheduler, pruning_amount=0.5):
    """
    SWA 학습 및 가지치기를 포함한 학습 함수.
    가지치기는 SWA 업데이트 이후에 수행합니다.
    """
    torch.cuda.empty_cache()
    history = []

    for epoch in range(epochs):
        model.train()
        train_losses = []
        for batch in train_dl:
            images, labels = batch
            outputs = model(images)
            loss = F.cross_entropy(outputs, labels)
            train_losses.append(loss.item())
            
            # 역전파 및 그래디언트 클리핑
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # 클리핑 추가
            
            optimizer.step()
            optimizer.zero_grad()
            scheduler.step()

        # SWA 업데이트 (swa_start 이후)
        if epoch >= swa_start:
            swa_model.update_parameters(model)

        # Validation
        val_loss, val_acc = evaluate_model(model, val_dl)
        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {sum(train_losses)/len(train_losses):.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        history.append((val_loss, val_acc))

    # 가지치기 적용
    print("Applying pruning after SWA training...")
    for module_name, module in model.named_modules():
        if isinstance(module, nn.Conv2d):
            prune.l1_unstructured(module, name="weight", amount=pruning_amount)
            print(f"Pruned layer {module_name}")

    # BatchNorm 통계 업데이트
    print("Updating BatchNorm statistics after pruning...")
    update_bn(train_dl, swa_model)
    
    return history, swa_model

In [None]:
# 평가 함수
@torch.no_grad()
def evaluate_model(model, val_dl):
    model.eval()
    val_loss, val_acc = 0, 0
    for batch in val_dl:
        images, labels = batch
        outputs = model(images)
        loss = F.cross_entropy(outputs, labels)
        _, preds = torch.max(outputs, dim=1)
        acc = (preds == labels).float().mean()
        val_loss += loss.item()
        val_acc += acc.item()
    return val_loss / len(val_dl), val_acc / len(val_dl)

In [None]:
# 학습 실행
epochs = 200
swa_start = 150
history, swa_model = fit_swa_pruning(epochs, swa_start, cnn_model, swa_model, train_dl, val_dl, base_optimizer, swa_scheduler)

# SWA 모델 평가
val_loss, val_acc = evaluate_model(swa_model, val_dl)
print(f"SWA Model - Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}")

In [None]:
# 가지치기 결과 확인
print("Verifying pruning sparsity...")
for module_name, module in cnn_model.named_modules():
    if isinstance(module, nn.Conv2d):
        print(f"Layer {module_name} sparsity: {torch.sum(module.weight == 0) / module.weight.numel():.2%}")

In [None]:
# 3. 양자화 (Quantization)
print("Applying dynamic quantization...")
cpu_model = swa_model.module.to('cpu')  # SWA 모델을 CPU로 이동
quantized_model = quantize_dynamic(
    cpu_model,  # CPU에서 동작하도록 설정
    {nn.Linear, nn.Conv2d},  # 양자화할 레이어
    dtype=torch.qint8  # 데이터 타입 설정
)

In [None]:
# 모델 크기 계산
def calculate_parameters(model):
    return sum(p.numel() for p in model.parameters())

def calculate_model_size(model, dtype=torch.float32):
    """
    모델의 메모리 사용량(MB)을 계산하는 함수.
    dtype에 따라 torch.finfo 또는 torch.iinfo를 사용하여 데이터 타입의 비트를 계산합니다.
    """
    num_params = sum(p.numel() for p in model.parameters())
    if dtype.is_floating_point:  # 부동소수점 타입
        bits = torch.finfo(dtype).bits
    else:  # 정수 타입
        bits = torch.iinfo(dtype).bits
    size_in_bytes = num_params * bits / 8
    return size_in_bytes / (1024 * 1024)  # MB로 변환


original_size = calculate_model_size(cnn_model)
quantized_size = calculate_model_size(quantized_model, dtype=torch.qint8)

print(f"Original Model Size: {original_size:.2f} MB")
print(f"Quantized Model Size: {quantized_size:.2f} MB")

In [None]:
# 가지치기 전 원래 모델 복사본 생성
original_model = CNNModel().to(device)  # 동일한 초기 모델 정의

# FLOPs 계산
flops_original = calculate_flops_with_fvcore(original_model, input_tensor)
flops_pruned = calculate_flops_with_fvcore(cnn_model, input_tensor)

# 결과 출력
print(f"Original Model FLOPs: {flops_original / 1e6:.2f} MFLOPs")
print(f"Pruned Model FLOPs: {flops_pruned / 1e6:.2f} MFLOPs")

In [None]:
# 가지치기 전 모델 추론 속도
inference_time_original = measure_inference_time(original_model, input_tensor, device=device)

# 가지치기 후 모델 추론 속도
inference_time_pruned = measure_inference_time(cnn_model, input_tensor, device=device)

# 결과 출력
print(f"Original Model Inference Time: {inference_time_original:.2f} ms")
print(f"Pruned Model Inference Time: {inference_time_pruned:.2f} ms")

In [None]:
# 테스트 데이터 예측 (CPU에서 실행)
print("Preparing test dataset...")
test_df[input_cols] = test_df[input_cols] / 255
test_input_tensors = torch.tensor(test_df[input_cols].values, dtype=torch.float32).reshape(-1, 1, 28, 28)
test_input_tensors = T.Resize((32, 32))(test_input_tensors)
test_ds = TensorDataset(test_input_tensors)

def predict_image(img, model):
    """
    한 이미지를 받아 모델로 예측 결과를 반환하는 함수.
    """
    xb = img.unsqueeze(0)  # 배치를 만듦
    yb = model(xb)  # 모델 예측
    _, preds = torch.max(yb, dim=1)  # 예측 클래스 반환
    return preds[0].item()

In [None]:
# 테스트 데이터 예측 및 결과 저장
print("Generating predictions (CPU)...")
sub_df['Label'] = [predict_image(test_ds[i][0], quantized_model) for i in range(len(test_ds))]

# 제출 파일 저장
submission_path = '/kaggle/working/submission.csv'
sub_df.to_csv(submission_path, index=False)
print(f"Submission file saved to {submission_path}")