In [1]:
# !pip install -q ptflops

In [2]:
!pip install -r requirements.txt

Defaulting to user installation because normal site-packages is not writeable


In [3]:
import os
import glob
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from sklearn.metrics import jaccard_score
import numpy as np
import math
import torch.nn.functional as F
import torchvision
import time
import pandas as pd
from PIL import Image
import datetime
import random
import sys

In [4]:
# # 기본 디렉토리 설정 
# TRAIN_DIR = "/kaggle/input/2025-sw-ai/archive/train"
# VAL_DIR = "/kaggle/input/2025-sw-ai/archive/val"
# TEST_DIR = "/kaggle/input/2025-sw-ai/archive/test/images"
# OUTPUT_PATH = "/kaggle/working/submission.csv"

In [5]:
# 로컬 디렉토리 설정
TRAIN_DIR = "input/2025-csu-sw-ai-challenge/archive/train" 
VAL_DIR = "input/2025-csu-sw-ai-challenge/archive/val"
TEST_DIR = "input/2025-csu-sw-ai-challenge/archive/test/images"
OUTPUT_CSV = "working/submission.csv" 
OUTPUT_MASK = "working/mask_ouputs"

In [6]:
SEED = 2025
def set_seed(seed=SEED):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f'set SEED: {SEED}')
set_seed()

set SEED: 2025


In [7]:
from albumentations import (
    Compose, HorizontalFlip, VerticalFlip, RandomRotate90, ShiftScaleRotate,
    RandomBrightnessContrast, GaussNoise, OneOf, Blur, MotionBlur, RandomGamma
)
from albumentations.pytorch import ToTensorV2


def get_transform():
    return Compose([
        HorizontalFlip(p=0.5),
        VerticalFlip(p=0.5),
        RandomRotate90(p=0.5),
        ShiftScaleRotate(shift_limit=0.06, scale_limit=0.15, rotate_limit=45, p=0.5),
        OneOf([
            GaussNoise(variance=(10.0, 40.0)),
            Blur(blur_limit=3),
            MotionBlur(blur_limit=3)
        ], p=0.5),
        RandomBrightnessContrast(p=0.4),
        RandomGamma(p=0.3),
        ToTensorV2()
    ])

class CrackDataset(Dataset):
    def __init__(self, root_dir, transform=None, augment_ratio=1):
        self.img_dir = os.path.join(root_dir, "images")
        self.mask_dir = os.path.join(root_dir, "masks")
        self.img_list = sorted(glob.glob(self.img_dir + "/*.jpg"))
        self.mask_list = sorted(glob.glob(self.mask_dir + "/*.jpg"))
        self.transform = transform
        self.augment_ratio = augment_ratio  # 추가!

    def __len__(self):
        return len(self.img_list) * self.augment_ratio  # 원본x배수

    def __getitem__(self, idx):
        orig_idx = idx // self.augment_ratio  # 원본 인덱스 재설정
        img = Image.open(self.img_list[orig_idx]).convert("L")
        mask = Image.open(self.mask_list[orig_idx]).convert("L")

        img = np.array(img, dtype=np.float32) / 255.0
        mask = np.array(mask, dtype=np.float32) / 255.0
        mask = (mask > 0.5).astype(np.float32)

        if self.transform:
            augmented = self.transform(image=img, mask=mask)
            img = augmented['image']  # (1,H,W) tensor
            mask = augmented['mask'].unsqueeze(0).float()
        else:
            img = torch.tensor(img).unsqueeze(0)
            mask = torch.tensor(mask).unsqueeze(0)

        return img, mask

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class EfficientCrackNet(nn.Module):
    def __init__(self,
                 in_channels=1,
                 base=16,
                 num_classes=1,
                 pretrained=None):
        super(EfficientCrackNet, self).__init__()
        self.base = base
        self.num_classes = num_classes
        self.pretrained = pretrained
        
        # Edge Extraction Method (EEM)
        self.eem = EdgeExtractionMethod(in_channels=in_channels, out_channels=base)
        
        # Encoder
        self.encoder1 = EncoderBlock(in_channels=base, out_channels=base*2, 
                                     use_ulsam=True, num_subspaces=4)
        self.encoder2 = EncoderBlock(in_channels=base*2, out_channels=base*4, 
                                     use_ulsam=True, num_subspaces=4)
        self.encoder3 = EncoderBlock(in_channels=base*4, out_channels=base*8, 
                                     use_ulsam=True, num_subspaces=4)
        
        # Bottleneck with MobileViT
        self.bottleneck = MobileViTBlock(
            in_channels=base*8,
            out_channels=base*8,
            transformer_dim=base*4,
            num_heads=4,
            num_transformer_blocks=2,
            patch_size=(2, 2)
        )
        
        # Decoder
        self.decoder3 = DecoderBlock(in_channels=base*8, skip_channels=base*8, 
                                     out_channels=base*4)
        self.decoder2 = DecoderBlock(in_channels=base*4, skip_channels=base*4, 
                                     out_channels=base*2)
        self.decoder1 = DecoderBlock(in_channels=base*2, skip_channels=base*2, 
                                     out_channels=base)
        
        # Segmentation heads
        self.aux_head1 = SegHead(inplanes=base*2, interplanes=base, 
                                outplanes=num_classes, aux_head=True)
        self.aux_head2 = SegHead(inplanes=base*4, interplanes=base*2, 
                                outplanes=num_classes, aux_head=True)
        self.head = SegHead(inplanes=base, interplanes=base, 
                           outplanes=num_classes, aux_head=False)
        
        self.init_weight()
    
    def forward(self, x):
        h, w = x.shape[2:]
        logit_list = []
        
        # Edge Extraction
        eem_out = self.eem(x)
        
        # Encoder path
        enc1 = self.encoder1(eem_out)
        enc2 = self.encoder2(enc1)
        enc3 = self.encoder3(enc2)
        
        # Bottleneck
        bottle = self.bottleneck(enc3)
        
        # Decoder path
        dec3 = self.decoder3(bottle, enc3)
        dec2 = self.decoder2(dec3, enc2)
        dec1 = self.decoder1(dec2, enc1)
        
        # Segmentation heads
        if self.training:
            main_out = self.head(dec1)
            aux_out1 = self.aux_head1(dec2)
            aux_out2 = self.aux_head2(dec3)
            
            logit_list = [main_out, aux_out1, aux_out2]
            logit_list = [F.interpolate(logit, size=(h, w), mode='bilinear', 
                                       align_corners=True) for logit in logit_list]
            return logit_list
        else:
            main_out = self.head(dec1)
            logit_list = [main_out]
            logit_list = [F.interpolate(logit, size=(h, w), mode='bilinear', 
                                       align_corners=True) for logit in logit_list]
            return logit_list
    
    def init_weight(self):
        if self.pretrained is not None:
            pass
        else:
            for m in self.modules():
                if isinstance(m, nn.Conv2d):
                    nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                    if m.bias is not None:
                        nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.BatchNorm2d):
                    nn.init.constant_(m.weight, 1)
                    nn.init.constant_(m.bias, 0)


# Edge Extraction Method (EEM) - DoG and LoG based
class EdgeExtractionMethod(nn.Module):
    def __init__(self, in_channels=3, out_channels=16):
        super(EdgeExtractionMethod, self).__init__()
        
        # Gaussian blur for smoothing
        self.gaussian = nn.Conv2d(in_channels, in_channels, kernel_size=5, 
                                  stride=1, padding=2, groups=in_channels, bias=False)
        
        # Initialize Gaussian kernel
        self._init_gaussian_kernel()
        
        # DoG (Difference of Gaussian) approximation
        self.dog_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels//2, kernel_size=3, stride=1, 
                     padding=1, bias=False),
            nn.BatchNorm2d(out_channels//2),
            nn.ReLU(inplace=True)
        )
        
        # LoG (Laplacian of Gaussian) approximation
        self.log_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels//2, kernel_size=3, stride=1, 
                     padding=1, bias=False),
            nn.BatchNorm2d(out_channels//2),
            nn.ReLU(inplace=True)
        )
        
        # Feature refinement
        self.refine = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, 
                     padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, 
                     padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def _init_gaussian_kernel(self):
        # Create 5x5 Gaussian kernel
        kernel_size = 5
        sigma = 1.0
        kernel = torch.zeros((kernel_size, kernel_size))
        center = kernel_size // 2
        
        for i in range(kernel_size):
            for j in range(kernel_size):
                x, y = i - center, j - center
                kernel[i, j] = math.exp(-(x**2 + y**2) / (2 * sigma**2))
        
        kernel = kernel / kernel.sum()
        kernel = kernel.view(1, 1, kernel_size, kernel_size)
        kernel = kernel.repeat(self.gaussian.in_channels, 1, 1, 1)
        
        self.gaussian.weight.data = kernel
        self.gaussian.weight.requires_grad = False
    
    def forward(self, x):
        # Apply Gaussian blur
        smoothed = self.gaussian(x)
        
        # Extract edge features using DoG and LoG
        dog_features = self.dog_conv(smoothed)
        log_features = self.log_conv(smoothed)
        
        # Concatenate edge features
        edge_features = torch.cat([dog_features, log_features], dim=1)
        
        # Refine features
        out = self.refine(edge_features)
        
        return out


# Ultra-Lightweight Subspace Attention Module (ULSAM)
class ULSAM(nn.Module):
    def __init__(self, channels, num_subspaces=4):
        super(ULSAM, self).__init__()
        self.channels = channels
        self.num_subspaces = num_subspaces
        
        assert channels % num_subspaces == 0, "Channels must be divisible by num_subspaces"
        
        self.subspace_channels = channels // num_subspaces
        self.subspaces = nn.ModuleList([
            SubspaceAttention(self.subspace_channels) for _ in range(num_subspaces)
        ])
    
    def forward(self, x):
        # Split input into subspaces
        subspace_features = torch.chunk(x, self.num_subspaces, dim=1)
        
        # Apply attention to each subspace
        attended_features = []
        for idx, subspace_feat in enumerate(subspace_features):
            attended = self.subspaces[idx](subspace_feat)
            attended_features.append(attended)
        
        # Concatenate attended subspaces
        out = torch.cat(attended_features, dim=1)
        
        return out


class SubspaceAttention(nn.Module):
    def __init__(self, channels):
        super(SubspaceAttention, self).__init__()
        
        # Depthwise convolution for spatial attention
        self.dw_conv = nn.Conv2d(channels, channels, kernel_size=3, stride=1, 
                                 padding=1, groups=channels, bias=False)
        self.bn_dw = nn.BatchNorm2d(channels)
        self.relu_dw = nn.ReLU(inplace=True)
        
        # Max pooling for spatial aggregation
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        
        # Pointwise convolution to generate attention map
        self.pw_conv = nn.Conv2d(channels, 1, kernel_size=1, stride=1, 
                                padding=0, bias=False)
        self.bn_pw = nn.BatchNorm2d(1)
        self.relu_pw = nn.ReLU(inplace=True)
        
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        # Depthwise convolution
        out = self.dw_conv(x)
        out = self.bn_dw(out)
        out = self.relu_dw(out)
        
        # Spatial aggregation
        out = self.maxpool(out)
        
        # Generate attention map
        out = self.pw_conv(out)
        out = self.bn_pw(out)
        out = self.relu_pw(out)
        
        # Apply sigmoid and expand to match input channels
        attention_map = self.sigmoid(out)
        attention_map = attention_map.expand_as(x)
        
        # Apply attention and add residual
        out = x * attention_map + x
        
        return out


# Depthwise Separable Convolution
class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super(DepthwiseSeparableConv, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=kernel_size,
                                   stride=stride, padding=padding, groups=in_channels, bias=False)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1,
                                   stride=1, padding=0, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        out = self.depthwise(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.pointwise(out)
        out = self.bn2(out)
        out = self.relu(out)
        return out

# MobileViT Block
class MobileViTBlock(nn.Module):
    def __init__(self, in_channels, out_channels, transformer_dim,
                 num_heads=4, num_transformer_blocks=2, patch_size=(2, 2)):
        super(MobileViTBlock, self).__init__()
        self.patch_h, self.patch_w = patch_size
        self.transformer_dim = transformer_dim

        # Local feature extractor
        self.local_rep = nn.Sequential(
            DepthwiseSeparableConv(in_channels, in_channels, kernel_size=3, stride=1, padding=1),
            DepthwiseSeparableConv(in_channels, transformer_dim, kernel_size=1, stride=1, padding=0)
        )

        # Linear projection for transformer
        self.fc1 = nn.Linear(self.patch_h * self.patch_w * transformer_dim, transformer_dim)
        self.fc2 = nn.Linear(transformer_dim, self.patch_h * self.patch_w * transformer_dim)

        # Transformer blocks
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(transformer_dim, num_heads)
            for _ in range(num_transformer_blocks)
        ])

        # Normalization and fusion
        self.norm = nn.LayerNorm(transformer_dim)
        self.fusion = nn.Sequential(
            DepthwiseSeparableConv(transformer_dim, in_channels, kernel_size=1, stride=1, padding=0),
            DepthwiseSeparableConv(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
        )

        # Unfold/Fold layers for patch manipulation
        self.unfold = nn.Unfold(kernel_size=patch_size, stride=patch_size)
        self.fold = nn.Fold(output_size=None, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        B, C, H, W = x.shape
        local_features = self.local_rep(x)  # (B, transformer_dim, H, W)

        # Patchify
        patches = self.unfold(local_features)  # (B, patch_dim, num_patches)
        patches = patches.transpose(1, 2)      # (B, num_patches, patch_dim)

        # Calculate patch dimension
        patch_dim = patches.shape[-1]
        # Project to transformer dimension
        patches = self.fc1(patches)

        # Transformer processing
        for blk in self.transformer_blocks:
            patches = blk(patches)
        patches = self.norm(patches)

        # Project back
        patches = self.fc2(patches)
        patches = patches.transpose(1, 2)  # (B, patch_dim, num_patches)

        # Fold back to spatial map
        new_H = H // self.patch_h * self.patch_h
        new_W = W // self.patch_w * self.patch_w
        folded = F.fold(patches, output_size=(new_H, new_W),
                        kernel_size=(self.patch_h, self.patch_w),
                        stride=(self.patch_h, self.patch_w))

        # Fusion + Residual
        out = self.fusion(folded)
        out = out + x
        return out

class TransformerBlock(nn.Module):
    def __init__(self, dim, num_heads=4, mlp_ratio=4.0, dropout=0.0):
        super(TransformerBlock, self).__init__()
        self.norm1 = nn.LayerNorm(dim)
        self.attn = nn.MultiheadAttention(dim, num_heads, dropout=dropout,
                                          batch_first=True)
        self.norm2 = nn.LayerNorm(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_hidden_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        x_norm = self.norm1(x)
        attn_out, _ = self.attn(x_norm, x_norm, x_norm)
        x = x + attn_out
        x_norm = self.norm2(x)
        mlp_out = self.mlp(x_norm)
        x = x + mlp_out
        return x

# Encoder Block with Depthwise Separable Conv and ULSAM
class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, use_ulsam=True, num_subspaces=4):
        super(EncoderBlock, self).__init__()
        
        self.use_ulsam = use_ulsam
        
        # Downsampling with depthwise separable conv
        self.downsample = nn.Sequential(
            DepthwiseSeparableConv(in_channels, out_channels, kernel_size=3, 
                                  stride=2, padding=1),
            DepthwiseSeparableConv(out_channels, out_channels, kernel_size=3, 
                                  stride=1, padding=1)
        )
        
        # ULSAM for attention
        if self.use_ulsam:
            self.ulsam = ULSAM(out_channels, num_subspaces)
    
    def forward(self, x):
        out = self.downsample(x)
        
        if self.use_ulsam:
            out = self.ulsam(out)
        
        return out


# Decoder Block
class DecoderBlock(nn.Module):
    def __init__(self, in_channels, skip_channels, out_channels):
        super(DecoderBlock, self).__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        # fusion에서 in_channels=업샘플 채널+skip 채널로 잡기
        fusion_in = in_channels + skip_channels
        self.fusion = nn.Sequential(
            DepthwiseSeparableConv(fusion_in, out_channels, kernel_size=3, stride=1, padding=1),
            DepthwiseSeparableConv(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        )

    def forward(self, x, skip):
        x_up = self.upsample(x)
        if x_up.size()[2:] != skip.size()[2:]:
            x_up = F.interpolate(x_up, size=skip.shape[2:], mode='bilinear', align_corners=True)
        x_concat = torch.cat([x_up, skip], dim=1)
        out = self.fusion(x_concat)
        return out

# Segmentation Head
class SegHead(nn.Module):
    def __init__(self, inplanes, interplanes, outplanes, aux_head=False):
        super(SegHead, self).__init__()
        
        self.bn1 = nn.BatchNorm2d(inplanes)
        self.relu = nn.ReLU(inplace=True)
        
        if aux_head:
            # Auxiliary head (no upsampling)
            self.conv_bn_relu = nn.Sequential(
                nn.Conv2d(inplanes, interplanes, kernel_size=3, stride=1, padding=1),
                nn.BatchNorm2d(interplanes),
                nn.ReLU(inplace=True)
            )
        else:
            # Main head (with upsampling)
            self.conv_bn_relu = nn.Sequential(
                nn.ConvTranspose2d(inplanes, interplanes, kernel_size=3, stride=2, 
                                  padding=1, output_padding=1),
                nn.BatchNorm2d(interplanes),
                nn.ReLU(inplace=True)
            )
        
        self.conv = nn.Conv2d(interplanes, outplanes, kernel_size=1, stride=1, padding=0)
    
    def forward(self, x):
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv_bn_relu(x)
        out = self.conv(x)
        
        return out

In [9]:
def binary_metrics(preds, targets, eps=1e-6):
    preds = preds.float()
    targets = targets.float()

    tp = (preds * targets).sum(dim=(1,2,3))
    fp = (preds * (1 - targets)).sum(dim=(1,2,3))
    fn = ((1 - preds) * targets).sum(dim=(1,2,3))

    precision = (tp + eps) / (tp + fp + eps)
    recall    = (tp + eps) / (tp + fn + eps)
    f1        = (2 * precision * recall + eps) / (precision + recall + eps)  # Dice
    union     = tp + fp + fn
    iou       = (tp + eps) / (union + eps)

    return {
        "iou": iou.mean().item(),
        "precision": precision.mean().item(),
        "recall": recall.mean().item(),
        "f1": f1.mean().item(),
    }

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import time

# --- [추가] F-beta 손실 함수 정의 ---
class FbetaLoss(nn.Module):
    def __init__(self, beta=2.0, smooth=1e-6):
        super(FbetaLoss, self).__init__()
        self.beta = beta
        self.smooth = smooth

    def forward(self, logits, targets):
        preds = torch.sigmoid(logits)

        tp = (preds * targets).sum(dim=(2, 3))
        fp = (preds * (1 - targets)).sum(dim=(2, 3))
        fn = ((1 - preds) * targets).sum(dim=(2, 3))

        beta2 = self.beta ** 2
        f_beta = ((1 + beta2) * tp + self.smooth) / ((1 + beta2) * tp + beta2 * fn + fp + self.smooth)
        
        return 1 - f_beta.mean()

# --- [추가] F-beta 점수 계산 함수 (검증용) ---
def calculate_fbeta(preds, masks, beta=2.0, smooth=1e-6):
    preds = preds.float()
    masks = masks.float()

    # 배치 전체를 하나의 큰 이미지로 보고 계산
    tp = (preds * masks).sum()
    fp = (preds * (1 - masks)).sum()
    fn = ((1 - preds) * masks).sum()
    
    beta2 = beta ** 2
    f_beta = ((1 + beta2) * tp + smooth) / ((1 + beta2) * tp + beta2 * fn + fp + smooth)
    
    return f_beta.item()

# --- [적용] 사용자께서 제공하신 binary_metrics 함수 ---
def binary_metrics(preds, targets, eps=1e-6):
    preds = preds.float()
    targets = targets.float()

    # 배치 내 각 샘플에 대해 TP, FP, FN 계산
    tp = (preds * targets).sum(dim=(1,2,3))
    fp = (preds * (1 - targets)).sum(dim=(1,2,3))
    fn = ((1 - preds) * targets).sum(dim=(1,2,3))

    precision = (tp + eps) / (tp + fp + eps)
    recall    = (tp + eps) / (tp + fn + eps)
    f1        = (2 * precision * recall + eps) / (precision + recall + eps)
    union     = tp + fp + fn
    iou       = (tp + eps) / (union + eps)

    # 각 지표를 배치에 대해 평균내어 반환
    return {
        "iou": iou.mean().item(),
        "precision": precision.mean().item(),
        "recall": recall.mean().item(),
        "f1": f1.mean().item(),
    }

# ------------------------------------------------------------------
# train_model 함수
# ------------------------------------------------------------------

def train_model(
    model,
    train_loader,
    val_loader,
    device,
    epochs=10,
    aux_weights=(1.0, 0.4, 0.4),
    lr=1e-3,
    use_amp=False,
    log_every=500,
    validate_every_steps=None,
    threshold=0.5,
    patience=5,
    model_save_path='best_model.pth'
):
    model.to(device)
    criterion = FbetaLoss(beta=2.0)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scaler = torch.cuda.amp.GradScaler(enabled=use_amp)

    # --- [추가] Early Stopping을 위한 변수 초기화 ---
    patience_counter = 0
    best_val_fbeta = 0.0

    global_step = 0
    # --- [복원] Step 단위 로깅을 위한 변수 ---
    win_loss, win_iou, win_f1, win_steps = 0.0, 0.0, 0.0, 0
    t0 = time.time()

    best_threshold = threshold  # F2 score 최적화 위해 threshold 저장 변수 추가

    for epoch in range(1, epochs + 1):
        model.train()
        epoch_loss = 0.0

        for imgs, masks in train_loader:
            global_step += 1
            imgs = imgs.to(device, non_blocking=True)
            masks = masks.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)

            with torch.cuda.amp.autocast(enabled=use_amp):
                outputs = model(imgs)
                main_logit = outputs[0] if isinstance(outputs, (list, tuple)) else outputs
                
                loss = aux_weights[0] * criterion(main_logit, masks)
                if isinstance(outputs, (list, tuple)):
                    if len(outputs) > 1 and aux_weights[1] > 0:
                        loss = loss + aux_weights[1] * criterion(outputs[1], masks)
                    if len(outputs) > 2 and aux_weights[2] > 0:
                        loss = loss + aux_weights[2] * criterion(outputs[2], masks)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            epoch_loss += loss.item()
            
            # --- [복원] Step 단위 로깅을 위한 집계 로직 ---
            win_loss += loss.item()
            win_steps += 1
            with torch.no_grad():
                probs = torch.sigmoid(main_logit)
                preds = (probs > best_threshold).float()
                m = binary_metrics(preds, masks)
                win_iou += m["iou"]
                win_f1  += m["f1"]
            
            # --- [복원] Step 단위 로깅 출력문 ---
            if log_every and (global_step % log_every == 0):
                elapsed = time.time() - t0
                lr_now = optimizer.param_groups[0]["lr"]
                print(f"[Step {global_step}] epoch={epoch}  "
                      f"avg_loss(win)={win_loss/max(1,win_steps):.4f}  "
                      f"avg_iou(win)={win_iou/max(1,win_steps):.4f}  "
                      f"avg_f1(win)={win_f1/max(1,win_steps):.4f}  "
                      f"lr={lr_now:.3e}  elapsed={elapsed:.1f}s")
                win_loss = win_iou = win_f1 = 0.0
                win_steps = 0
                t0 = time.time()

        # --- Epoch 종료 후 검증 ---
        avg_train_loss = epoch_loss / max(1, len(train_loader))
        model.eval()

        val_fbeta_list, val_loss_list, val_iou_list, val_f1_list = [], [], [], []

        all_probs = []
        all_masks = []

        with torch.no_grad():
            for imgs, masks in val_loader:
                imgs = imgs.to(device, non_blocking=True)
                masks = masks.to(device, non_blocking=True)

                logits_list = model(imgs)
                main_logit = logits_list[0] if isinstance(logits_list, (list, tuple)) else logits_list

                loss = criterion(main_logit, masks)
                val_loss_list.append(loss.item())

                probs = torch.sigmoid(main_logit)
                all_probs.append(probs.cpu())
                all_masks.append(masks.cpu())

                preds = (probs > best_threshold).float()

                fbeta_score = calculate_fbeta(preds, masks, beta=2.0)
                val_fbeta_list.append(fbeta_score)

                m = binary_metrics(preds, masks)
                val_iou_list.append(m["iou"])
                val_f1_list.append(m["f1"])

        all_probs = torch.cat(all_probs, dim=0).numpy()
        all_masks = torch.cat(all_masks, dim=0).numpy()

        # --- Threshold 최적화 (F2 score 기준) ---
        thresholds = np.linspace(0.01, 0.99, 99)
        best_score = 0.0
        best_epoch_threshold = best_threshold

        for t in thresholds:
            preds_bin = (all_probs > t).astype(np.uint8)
            tp = (preds_bin * all_masks).sum(axis=(1,2,3))
            fp = (preds_bin * (1 - all_masks)).sum(axis=(1,2,3))
            fn = ((1 - preds_bin) * all_masks).sum(axis=(1,2,3))

            beta2 = 2.0 ** 2
            f_beta = ((1 + beta2) * tp + 1e-6) / ((1 + beta2) * tp + beta2 * fn + fp + 1e-6)
            score = f_beta.mean()

            if score > best_score:
                best_score = score
                best_epoch_threshold = t
        # --- Threshold 최적화 끝 ---

        avg_val_loss = np.mean(val_loss_list)
        avg_val_fbeta = np.mean(val_fbeta_list)
        avg_val_iou = np.mean(val_iou_list)
        avg_val_f1 = np.mean(val_f1_list)

        # --- [복원] Epoch 단위 출력문에 F2 score, 최적 threshold 출력 추가 ---
        print(f"[Epoch {epoch}/{epochs}] "
              f"Train Loss: {avg_train_loss:.4f} | "
              f"Val Loss: {avg_val_loss:.4f} | "
              f"Val IoU: {avg_val_iou:.4f} | "
              f"Val F1: {avg_val_f1:.4f} | "
              f"Val F-beta(β=2.0): {avg_val_fbeta:.4f} | "
              f"Best Threshold: {best_epoch_threshold:.3f} | "
              f"Best F-beta@threshold: {best_score:.4f}")

        # Early Stopping 로직 (F-beta 기준)
        if best_score > best_val_fbeta:
            best_val_fbeta = best_score
            best_threshold = best_epoch_threshold
            patience_counter = 0
            torch.save(model.state_dict(), model_save_path)
            print(f" -> Best score updated. Model saved.")
        else:
            patience_counter += 1
            print(f" -> Patience: {patience_counter}/{patience}")

        if patience_counter >= patience:
            print(f"\nEarly stopping triggered after {patience} epochs without improvement.")
            break
            
    print(f"\nTraining finished. Best Val F-beta(β=2.0) was: {best_val_fbeta:.4f}")
    print(f"Best Threshold found: {best_threshold:.3f}")

    model.load_state_dict(torch.load(model_save_path))
    return model, best_threshold


In [11]:
train_dataset = CrackDataset(TRAIN_DIR, transform=get_transform(), augment_ratio=2)
val_dataset = CrackDataset(VAL_DIR, transform=None, augment_ratio=1)

print("Load..tarin data")
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
print("Loaded train data")
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

Load..tarin data
Loaded train data


  original_init(self, **validated_kwargs)
  GaussNoise(variance=(10.0, 40.0)),


In [12]:
from ptflops import get_model_complexity_info

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EfficientCrackNet().to(device)
input_size = (1, 192, 192)

macs, params = get_model_complexity_info(model,
                                         input_size,
                                         as_strings=True,
                                         print_per_layer_stat=False,
                                         verbose=False)
print(f"Total Params: {params}")
print(f"Total MACs: {macs}")

Total Params: 290.38 k
Total MACs: 392.74 MMac


In [73]:
print(device)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#model = EfficientCrackNet().to(device)
model, best_threshold = train_model(model, train_loader, val_loader, device, epochs=100, patience=10)

cuda


  scaler = torch.cuda.amp.GradScaler(enabled=use_amp)
  with torch.cuda.amp.autocast(enabled=use_amp):


cuda
[Step 500] epoch=1  avg_loss(win)=1.3739  avg_iou(win)=0.1064  avg_f1(win)=0.1865  lr=1.000e-03  elapsed=15.1s
[Step 1000] epoch=1  avg_loss(win)=1.1518  avg_iou(win)=0.2018  avg_f1(win)=0.3159  lr=1.000e-03  elapsed=14.1s
[Step 1500] epoch=1  avg_loss(win)=1.0756  avg_iou(win)=0.2376  avg_f1(win)=0.3648  lr=1.000e-03  elapsed=13.8s
[Epoch 1/40] Train Loss: 1.1737 | Val Loss: 0.4994 | Val IoU: 0.3225 | Val F1: 0.4537 | Val F-beta(β=2.0): 0.6360 | Best Threshold: 0.980 | Best F-beta@threshold: 0.5250
 -> Best score updated. Model saved.
[Step 2000] epoch=2  avg_loss(win)=1.0600  avg_iou(win)=0.2553  avg_f1(win)=0.3847  lr=1.000e-03  elapsed=48.7s
[Step 2500] epoch=2  avg_loss(win)=1.0174  avg_iou(win)=0.2866  avg_f1(win)=0.4249  lr=1.000e-03  elapsed=13.5s
[Step 3000] epoch=2  avg_loss(win)=1.0029  avg_iou(win)=0.2848  avg_f1(win)=0.4165  lr=1.000e-03  elapsed=13.1s
[Step 3500] epoch=2  avg_loss(win)=0.9918  avg_iou(win)=0.2920  avg_f1(win)=0.4260  lr=1.000e-03  elapsed=13.0s
[Epoc

In [14]:
def save_mask_image(
    mask_image: Image.Image,
    base_output_dir: str,
    original_filename: str,
    script_name: str = 'defalut'
):
    """
    마스크 이미지를 지정된 규칙에 따라 폴더를 생성하고 저장합니다.

    Args:
        mask_image (Image.Image): 저장할 PIL 이미지 객체.
        base_output_dir (str): 결과 폴더를 생성할 상위 경로.
        original_filename (str): 원본 이미지 파일명 (e.g., 'image_001.jpg').
        script_name (str): 현재 실행 중인 파이썬 스크립트 또는 노트북 파일명.
    
    Returns:
        str: 파일이 저장된 전체 경로.
    """
    # 1. 'test_파일명_mmddhhmm' 형식으로 폴더명 생성
    now = datetime.datetime.now()
    timestamp = now.strftime("%m%d%H%M")  # mmddhhmm 형식
    
    # 스크립트 이름에서 확장자(.py, .ipynb) 제거
    script_basename = os.path.splitext(script_name)[0]
    
    folder_name = f"test_{script_basename}_{timestamp}"
    output_dir = os.path.join(base_output_dir, folder_name)
    
    # 폴더 생성 (이미 존재하면 그대로 사용)
    os.makedirs(output_dir, exist_ok=True)

    # 2. 저장할 파일명 생성 (원본 파일명 기반)
    original_basename = os.path.splitext(original_filename)[0]
    output_filename = f"{original_basename}_mask.png"
    
    # 3. 전체 저장 경로를 조합하고 이미지 저장
    output_path = os.path.join(output_dir, output_filename)
    mask_image.save(output_path)
    
    return output_path

In [15]:
def rle_encode(mask):
    """
    mask: 2D numpy array of {0,1}, shape (H,W)
    return: run length as string
    """
    pixels = mask.flatten(order="C")
    ones = np.where(pixels == 1)[0] + 1  # 1-based
    if len(ones) == 0:
        return ""
    runs = []
    prev = -2
    for idx in ones:
        if idx > prev + 1:
            runs.extend((idx, 0))
        runs[-1] += 1
        prev = idx
    return " ".join(map(str, runs))


def predict_and_submit(model, test_img_dir, output_csv, device, threshold=0.5):
    model.eval()
    ids, rles = [], []

    test_imgs = sorted(glob.glob(os.path.join(test_img_dir, "*.jpg")))
    for path in test_imgs:
        img_id = os.path.splitext(os.path.basename(path))[0]
        img = Image.open(path).convert("L")
        arr = np.array(img, dtype=np.float32) / 255.0
        tensor = torch.tensor(arr).unsqueeze(0).unsqueeze(0).to(device)

        with torch.no_grad():
            out_list = model(tensor)
            main_logit = out_list[0] if isinstance(out_list, (list, tuple)) else out_list
            prob = torch.sigmoid(main_logit)[0,0].cpu().numpy()
            pred = (prob > threshold).astype(np.uint8)
        
        rle = rle_encode(pred)
        ids.append(img_id)
        rles.append(rle)

    df = pd.DataFrame({"image_id": ids, "rle": rles})
    df.to_csv(output_csv, index=False)
    print(f"[OK] submission saved to {output_csv}, total {len(df)} rows.")

In [76]:

def rle_encode(mask):
    """
    mask: 2D numpy array of {0,1}, shape (H,W)
    return: run length as string
    """
    pixels = mask.flatten(order="C")
    ones = np.where(pixels == 1)[0] + 1  # 1-based
    if len(ones) == 0:
        return ""
    runs = []
    prev = -2
    for idx in ones:
        if idx > prev + 1:
            runs.extend((idx, 0))
        runs[-1] += 1
        prev = idx
    return " ".join(map(str, runs))


def predict_submit_and_save_masks(
    model, 
    test_img_dir, 
    output_csv, 
    device, 
    threshold=0.5,
    save_masks=False,
    mask_save_dir=None
):
    
    model.eval()
    ids, rles = [], []

    # --- 이미지 저장을 위한 폴더 설정 ---
    output_mask_path = ""
    if save_masks:
        if mask_save_dir is None:
            # mask_save_dir가 지정되지 않으면 에러 발생
            raise ValueError("If save_masks is True, mask_save_dir must be provided.")
        
        # 현재 시간을 기반으로 하위 폴더 생성
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        output_mask_path = os.path.join(mask_save_dir, f"predictions_{timestamp}")
        os.makedirs(output_mask_path, exist_ok=True)
        print(f"Mask images will be saved to: {output_mask_path}")

    test_imgs = sorted(glob.glob(os.path.join(test_img_dir, "*.jpg")))
    for path in test_imgs:
        img_id = os.path.splitext(os.path.basename(path))[0]
        img = Image.open(path).convert("L")
        arr = np.array(img, dtype=np.float32) / 255.0
        tensor = torch.tensor(arr).unsqueeze(0).unsqueeze(0).to(device)

        with torch.no_grad():
            out_list = model(tensor)
            main_logit = out_list[0] if isinstance(out_list, (list, tuple)) else out_list
            prob = torch.sigmoid(main_logit)[0,0].cpu().numpy()
            pred = (prob > threshold).astype(np.uint8)
        
        # ---  마스크 이미지를 파일로 저장 ---
        if save_masks:
            mask_image = Image.fromarray(pred * 255, mode='L')
            mask_filename = f"{img_id}_mask.png"
            save_path = os.path.join(output_mask_path, mask_filename)
            mask_image.save(save_path)

        # --- RLE 인코딩 및 CSV 데이터 수집 ---
        rle = rle_encode(pred)
        ids.append(img_id)
        rles.append(rle)

    # --- CSV 파일로 최종 저장 ---
    df = pd.DataFrame({"image_id": ids, "rle": rles})
    df.to_csv(output_csv, index=False)
    print(f"OK. Submission CSV saved to {output_csv}, total {len(df)} rows.")
    
    if save_masks:
        print(f"OK. Mask images also saved in: {output_mask_path}")

In [77]:
predict_submit_and_save_masks(
    model=model,
    test_img_dir=TEST_DIR,
    output_csv="working/submission.csv",
    device=device,
    save_masks=True,  
    mask_save_dir=OUTPUT_MASK,
    threshold = best_threshold,
)

Mask images will be saved to: working/mask_ouputs/predictions_20251015_233222


  mask_image = Image.fromarray(pred * 255, mode='L')


OK. Submission CSV saved to working/submission.csv, total 2667 rows.
OK. Mask images also saved in: working/mask_ouputs/predictions_20251015_233222
