<a href="https://colab.research.google.com/github/rage147-OwO/dacon-SatelliteImageBuildingAreaSegmentation/blob/main/%5BBaseline%5D_Unet%EC%9D%84_%ED%99%9C%EC%9A%A9%ED%95%9C_Segmentation%EA%B3%BC_RLE_%EC%9D%B8%EC%BD%94%EB%94%A9_%EB%94%94%EC%BD%94%EB%94%A9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!unzip -qq "/content/drive/MyDrive/open.zip" -d "/content"

## Import

In [3]:
import os
import cv2
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## Utils

In [4]:
# RLE 디코딩 함수
def rle_decode(mask_rle, shape):
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape)

# RLE 인코딩 함수
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

## Custom Dataset

In [5]:
class SatelliteDataset(Dataset):
    def __init__(self, csv_file, transform=None, infer=False):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.infer = infer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 1]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.infer:
            if self.transform:
                image = self.transform(image=image)['image']
            return image

        mask_rle = self.data.iloc[idx, 2]
        mask = rle_decode(mask_rle, (image.shape[0], image.shape[1]))

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask

## Data Loader

In [12]:
transform = A.Compose(
    [
        A.Resize(224, 224),
        A.Normalize(),
        ToTensorV2()
    ]
)
dataset = SatelliteDataset(csv_file='./train.csv', transform=transform)


## Define Model

In [7]:


# 간단한 U-Net 모델 정의
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.dconv_down1 = double_conv(3, 64)
        self.dconv_down2 = double_conv(64, 128)
        self.dconv_down3 = double_conv(128, 256)
        self.dconv_down4 = double_conv(256, 512)

        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.dconv_up3 = double_conv(256 + 512, 256)
        self.dconv_up2 = double_conv(128 + 256, 128)
        self.dconv_up1 = double_conv(128 + 64, 64)

        self.conv_last = nn.Conv2d(64, 1, 1)

    def forward(self, x):
        conv1 = self.dconv_down1(x)
        x = self.maxpool(conv1)

        conv2 = self.dconv_down2(x)
        x = self.maxpool(conv2)

        conv3 = self.dconv_down3(x)
        x = self.maxpool(conv3)

        x = self.dconv_down4(x)

        x = self.upsample(x)
        x = torch.cat([x, conv3], dim=1)

        x = self.dconv_up3(x)
        x = self.upsample(x)
        x = torch.cat([x, conv2], dim=1)

        x = self.dconv_up2(x)
        x = self.upsample(x)
        x = torch.cat([x, conv1], dim=1)

        x = self.dconv_up1(x)

        out = self.conv_last(x)

        return out



# U-Net의 기본 구성 요소인 Double Convolution Block을 정의합니다.
def double_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True)
    )



#batch_size=150
class UNetLite(nn.Module):
    def __init__(self):
        super(UNetLite, self).__init__()
        self.dconv_down1 = double_conv(3, 16)
        self.dconv_down2 = double_conv(16, 32)

        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.dconv_up1 = double_conv(16 + 32, 16)

        self.conv_last = nn.Conv2d(16, 1, 1)

    def forward(self, x):
        conv1 = self.dconv_down1(x)
        x = self.maxpool(conv1)

        x = self.dconv_down2(x)

        x = self.upsample(x)
        x = torch.cat([x, conv1], dim=1)

        x = self.dconv_up1(x)

        out = self.conv_last(x)

        return out

## Model Train

In [None]:
import torch, gc
from sklearn.model_selection import KFold

import matplotlib.pyplot as plt
import numpy as np
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,TensorDataset,random_split,SubsetRandomSampler, ConcatDataset
from torch.nn import functional as F
import torchvision
from torchvision import datasets,transforms
import torchvision.transforms as transforms


def calculate_iou(outputs, masks, threshold=0.5):
    # Apply sigmoid activation to the outputs
    outputs = torch.sigmoid(outputs)

    # Binarize the outputs and masks based on the threshold
    outputs = (outputs > threshold).float()
    masks = (masks > 0.5).float()

    intersection = torch.sum(outputs * masks)

    union = max(torch.sum(outputs) + torch.sum(masks) - intersection, 0)
    iou = intersection / (union + 1e-6)
    return iou.item()





torch.manual_seed(2147483647)



batch_size=150


train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx) # index 생성
val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx) # index 생성


# sampler를 이용한 DataLoader 정의
trainloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=train_subsampler) # 해당하는 index 추출
valloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=val_subsampler)

# model 초기화
#model = UNet().to(device)
model = UNetLite().to(device)


# loss function과 optimizer 정의
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    epoch_loss = 0
    for images, masks in tqdm(trainloader):
        images = images.float().to(device)
        masks = masks.float().to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.unsqueeze(1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f'Epoch {epoch+1}, Training Loss: {epoch_loss/len(trainloader)}')

    gc.collect()
    torch.cuda.empty_cache()
    model.eval()
    epoch_loss = 0
    total_iou = 0
    num_batches = 0

    with torch.no_grad():
        for images, masks in tqdm(valloader):
            images = images.float().to(device)
            masks = masks.float().to(device)

            outputs = model(images)
            loss = criterion(outputs, masks.unsqueeze(1))
            epoch_loss += loss.item()

            iou = calculate_iou(outputs, masks)
            total_iou += iou
            num_batches += 1

    print(f'Epoch {epoch+1}, Validation Loss: {epoch_loss/num_batches}')
    print(f'Epoch {epoch+1}, Validation IoU: {total_iou/num_batches}')

    # Clear memory
    del images, masks, outputs, loss
    gc.collect()
    torch.cuda.empty_cache()

    torch.save(model.state_dict(), f'./model_{epoch+1}.pth')


## Inference

In [None]:
test_dataset = SatelliteDataset(csv_file='./test.csv', transform=transform, infer=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)



In [None]:
with torch.no_grad():
    model.eval()
    result = []
    for images in tqdm(test_dataloader):
        images = images.float().to(device)

        outputs = model(images)
        masks = torch.sigmoid(outputs).cpu().numpy()
        masks = np.squeeze(masks, axis=1)
        masks = (masks > 0.35).astype(np.uint8) # Threshold = 0.35

        for i in range(len(images)):
            mask_rle = rle_encode(masks[i])
            if mask_rle == '': # 예측된 건물 픽셀이 아예 없는 경우 -1
                result.append(-1)
            else:
                result.append(mask_rle)

100%|██████████| 3790/3790 [11:32<00:00,  5.47it/s]


## Submission

In [None]:
submit = pd.read_csv('./sample_submission.csv')
submit['mask_rle'] = result

In [None]:
submit.to_csv('./submit.csv', index=False)