In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory



# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# -*- coding: utf-8 -*-
import os
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models

# ---------- Dice Loss ----------
def dice_loss(pred, target, smooth=1e-6):
    prob = torch.sigmoid(pred)
    inter = (prob * target).sum(dim=(1,2))
    union = prob.sum(dim=(1,2)) + target.sum(dim=(1,2))
    return 1 - ((2 * inter + smooth) / (union + smooth)).mean()

# ---------- Dataset ----------
class LaneDataset(Dataset):
    def __init__(self, img_dir, mask_dir=None, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.images = sorted(os.listdir(img_dir))
        self.transform = transform
    def __len__(self):
        return len(self.images)
    def __getitem__(self, idx):
        fname = self.images[idx]
        img = Image.open(os.path.join(self.img_dir, fname)).convert('RGB')
        img_t = self.transform(img)
        if self.mask_dir:
            m = Image.open(os.path.join(self.mask_dir, fname)).convert('L')
            m = m.resize((img_t.shape[2], img_t.shape[1]), Image.NEAREST)
            mask = (np.array(m) > 127).astype(np.int64)
            return img_t, torch.from_numpy(mask)
        else:
            return img_t, fname, img.size

# ---------- VGG16 Segmentation Model ----------
class VGG16Seg(nn.Module):
    def __init__(self, n_class=2, pretrained=True):
        super().__init__()
        vgg = models.vgg16(pretrained=pretrained)
        self.encoder = vgg.features
        self.classifier = nn.Conv2d(512, n_class, kernel_size=1)
    def forward(self, x):
        feat = self.encoder(x)
        out = self.classifier(feat)
        return nn.functional.interpolate(out, size=x.shape[2:], mode='bilinear', align_corners=False)

# ---------- RLE Encoding ----------
def mask_to_rle(mask):
    pixels = mask.flatten(order='F')
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

# ---------- Main Pipeline ----------
def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # Transforms with augmentation
    train_transform = transforms.Compose([
        transforms.Resize((512, 1024)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.3,0.3,0.3,0.1),
        transforms.ToTensor(),
    ])
    test_transform = transforms.Compose([
        transforms.Resize((512, 1024)),
        transforms.ToTensor(),
    ])
    # DataLoaders
    train_imgs = "lanesegmentationchallenge/train/train/frames"
    train_masks= "lanesegmentationchallenge/train/train/lane-masks"
    test_imgs = "lanesegmentationchallenge/test/test/frames"
    train_ds = LaneDataset(train_imgs, train_masks, train_transform)
    train_ld = DataLoader(train_ds, batch_size=8, shuffle=True, num_workers=4)
    test_ds  = LaneDataset(test_imgs, None, test_transform)
    test_ld  = DataLoader(test_ds, batch_size=1, num_workers=2)
    # Model, optimizer, loss
    model = VGG16Seg().to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    ce_loss = nn.CrossEntropyLoss()
    # Training
    print("▶️ Training with VGG16 + Augmentation...")
    for ep in range(1):
        model.train(); tot=0
        for imgs, masks in tqdm(train_ld, desc=f"Epoch {ep}"):
            imgs, masks = imgs.to(device), masks.to(device)
            logits = model(imgs)
            loss = 0.5 * ce_loss(logits, masks) + 0.5 * dice_loss(logits[:,1,:,:], masks==1)
            optimizer.zero_grad(); loss.backward(); optimizer.step()
            tot += loss.item()
        print(f"Epoch {ep} - Loss: {tot/len(train_ld):.4f}")
    torch.save(model.state_dict(), 'vgg16_aug.pth')
    # Inference & Submission
    print("▶️ Inference and Submission...")
    model.eval(); results = []
    os.makedirs('pred_masks', exist_ok=True)
    with torch.no_grad():
        for imgs, fnames, orig in tqdm(test_ld, desc='Infer'):
            imgs = imgs.to(device)
            out = model(imgs)
            pred = out.argmax(1).squeeze(0).cpu().numpy().astype(np.uint8)
            pil = Image.fromarray(pred*255)
            pil.save(os.path.join('pred_masks', fnames[0]))
            results.append({'filename': fnames[0], 'rle': mask_to_rle(pred)})
    pd.DataFrame(results).to_csv('submission.csv', index=False)
    print("✅ aug_submission.csv created.")

if __name__=='__main__':
    main()


Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:06<00:00, 88.0MB/s] 


▶️ Training with VGG16 + Augmentation...


Epoch 0: 100%|██████████| 454/454 [05:19<00:00,  1.42it/s]


Epoch 0 - Loss: 0.4043
▶️ Inference and Submission...


Infer: 100%|██████████| 2782/2782 [01:44<00:00, 26.70it/s]


✅ aug_submission.csv created.


In [None]:
# 필요한 라이브러리 임포트
import os
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models

# ---------- Dice Loss 함수 정의 ----------
# 예측과 실제 정답 간의 겹치는 정도를 측정하는 Dice Loss 함수
def dice_loss(pred, target, smooth=1e-6):
    pred = torch.softmax(pred, dim=1)[:, 1, :, :]  # 클래스 1에 대한 softmax 확률
    target = (target == 1).float()                 # target도 1인 픽셀만 선택
    intersection = (pred * target).sum(dim=(1, 2)) # 교집합
    union = pred.sum(dim=(1, 2)) + target.sum(dim=(1, 2)) # 합집합
    dice = (2. * intersection + smooth) / (union + smooth)
    return 1 - dice.mean()

# ---------- 데이터셋 클래스 ----------
class LaneDataset(Dataset):
    def __init__(self, img_dir, mask_dir=None, transform=None):
        self.img_dir = img_dir      # 이미지 디렉토리
        self.mask_dir = mask_dir    # 마스크 디렉토리
        self.images = sorted(os.listdir(img_dir))  # 이미지 파일 정렬
        self.transform = transform

    def __len__(self): 
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.img_dir, img_name)
        pil_img = Image.open(img_path).convert('RGB')
        original_size = pil_img.size  # (너비, 높이)
        image = self.transform(pil_img)

        if self.mask_dir:
            # 학습/검증 시 마스크도 함께 불러옴
            mask_path = os.path.join(self.mask_dir, img_name)
            mask = Image.open(mask_path).convert('L').resize((1024, 512))
            mask = (np.array(mask) > 127).astype(np.uint8)  # 이진화
            mask = torch.tensor(mask, dtype=torch.long)
            return image, mask
        else:
            # 테스트 시 마스크 없음 → 파일명과 원본 크기 반환
            return image, img_name, original_size

# ---------- 간단한 CNN 모델 정의 ----------
class SimpleCNNModel(nn.Module):
    def __init__(self, n_class):
        super().__init__()
        # 인코더 (특징 추출)
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(),
            nn.Conv2d(128, 128, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(),
            nn.Conv2d(256, 256, 3, padding=1), nn.ReLU()
        )
        # 디코더 (업샘플링)
        self.decoder = nn.Sequential(
            nn.Conv2d(256, 128, 3, padding=1), nn.ReLU(),
            nn.ConvTranspose2d(128, 128, 2, 2), nn.ReLU(),
            nn.Conv2d(128, 64, 3, padding=1), nn.ReLU(),
            nn.ConvTranspose2d(64, 64, 2, 2), nn.ReLU(),
            nn.Conv2d(64, 32, 3, padding=1), nn.ReLU()
        )
        # 클래스 분류기
        self.classifier = nn.Conv2d(32, n_class, kernel_size=1)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return self.classifier(x)
# ---
class VGG16Seg(nn.Module):
    def __init__(self, n_class=2, pretrained=True):
        super().__init__()
        vgg = models.vgg16(pretrained=pretrained)
        self.encoder = vgg.features
        self.classifier = nn.Conv2d(512, n_class, kernel_size=1)

    def forward(self, x):
        x = self.encoder(x)
        x = self.classifier(x)  # [B, n_class, H/32, W/32]
        # Upsample back to input size
        return nn.functional.interpolate(x, scale_factor=32, mode='bilinear', align_corners=False)
# ---------- 마스크를 RLE로 인코딩하는 함수 ----------
class RESNET16Seg(nn.Module):
    def __init__(self, n_class=2, pretrained=True):
        super().__init__()
        resnet = models.resnet18(pretrained=pretrained)
        self.encoder = nn.Sequential(*list(resnet.children())[:-2])
        self.classifier = nn.Conv2d(512, n_class, kernel_size=1)

    def forward(self, x):
        x = self.encoder(x)
        x = self.classifier(x)  # [B, n_class, H/32, W/32]
        # Upsample back to input size
        return nn.functional.interpolate(x, scale_factor=32, mode='bilinear', align_corners=False)
#--
def mask_to_rle(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return " ".join(map(str, runs))

# ---------- 메인 함수 ----------
def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 이미지 전처리 정의
    transform = transforms.Compose([
        transforms.Resize((512, 1024)),
        transforms.ToTensor()
    ])

    # --- 학습 시작 ---
    train_dataset = LaneDataset(
        "lanesegmentationchallenge/train/train/frames",
        "lanesegmentationchallenge/train/train/lane-masks",
        transform
    )
    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)

    model = VGG16Seg(n_class=2).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    ce_loss_fn = nn.CrossEntropyLoss()

    print("▶️ 모델 학습 시작...")
    for epoch in range(1):
        model.train()
        total_loss = 0
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
        for imgs, masks in pbar:
            imgs, masks = imgs.to(device), masks.to(device)
            outputs = model(imgs)
            ce = ce_loss_fn(outputs, masks)
            d  = dice_loss(outputs, masks)
            loss = 0.5 * ce + 0.5 * d  
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            pbar.set_postfix(loss=f"{loss.item():.4f}")
        print(f"[Epoch {epoch+1}] 평균 손실: {total_loss / len(train_loader):.4f}")

    torch.save(model.state_dict(), "simple_cnn_model.pth")
    print("💾 모델이 simple_cnn_model.pth로 저장되었습니다.")

    # --- 테스트 이미지에 대한 추론 및 제출 파일 생성 ---
    print("🧪 submission.csv 생성 시작...")
    test_dataset = LaneDataset(
        "lanesegmentationchallenge/test/test/frames",
        None, transform
    )
    test_loader = DataLoader(test_dataset, batch_size=1)
    model.eval()

    results = []
    with torch.no_grad():
        for imgs, fnames, orig_size in tqdm(test_loader, desc="추론 중"):
            imgs = imgs.to(device)
            output = model(imgs)
            pred_mask = output.argmax(1).squeeze(0).cpu().numpy().astype(np.uint8)
            pred_pil = Image.fromarray(pred_mask)
            W, H = orig_size
            resized_mask = pred_pil.resize((W, H), resample=Image.NEAREST)
            binary = np.array(resized_mask) > 0
            rle = mask_to_rle(binary)
            results.append({'filename': fnames[0], 'rle': rle})

    pd.DataFrame(results).to_csv("aug_submission.csv", index=False)
    print("✅ submission.csv 파일이 저장되었습니다!")

# 프로그램 실행 진입점
if __name__ == "__main__":
    main()


▶️ 모델 학습 시작...


Epoch 1: 100%|██████████| 454/454 [01:33<00:00,  4.87it/s, loss=0.2442]


[Epoch 1] 평균 손실: 0.3259
💾 모델이 simple_cnn_model.pth로 저장되었습니다.
🧪 submission.csv 생성 시작...


추론 중: 100%|██████████| 2782/2782 [01:35<00:00, 29.21it/s]


✅ submission.csv 파일이 저장되었습니다!
