In [None]:
import os
import cv2
import pandas as pd
from typing import List, Union
from joblib import Parallel, delayed
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn.functional as F

from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# RLE 디코딩 함수
def rle_decode(mask_rle, shape):
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape)

def dice_score(prediction: np.array, ground_truth: np.array, smooth=1e-7) -> float:
    '''
    Calculate Dice Score between two binary masks.
    '''
    intersection = np.sum(prediction * ground_truth)
    return (2.0 * intersection + smooth) / (np.sum(prediction) + np.sum(ground_truth) + smooth)


def calculate_dice_scores(ground_truth_df, prediction_df, img_shape=(224, 224)) -> List[float]:
    '''
    Calculate Dice scores for a dataset.
    '''


    # Keep only the rows in the prediction dataframe that have matching img_ids in the ground truth dataframe
    prediction_df = prediction_df[prediction_df.iloc[:, 0].isin(ground_truth_df.iloc[:, 0])]
    prediction_df.index = range(prediction_df.shape[0])


    # Extract the mask_rle columns
    pred_mask_rle = prediction_df.iloc[:, 1]
    gt_mask_rle = ground_truth_df.iloc[:, 1]


    def calculate_dice(pred_rle, gt_rle):
        pred_mask = rle_decode(pred_rle, img_shape)
        gt_mask = rle_decode(gt_rle, img_shape)


        if np.sum(gt_mask) > 0 or np.sum(pred_mask) > 0:
            return dice_score(pred_mask, gt_mask)
        else:
            return None  # No valid masks found, return None


    dice_scores = Parallel(n_jobs=-1)(
        delayed(calculate_dice)(pred_rle, gt_rle) for pred_rle, gt_rle in zip(pred_mask_rle, gt_mask_rle)
    )


    dice_scores = [score for score in dice_scores if score is not None]  # Exclude None values


    return np.mean(dice_scores)

# RLE 인코딩 함수
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

In [None]:
class SatelliteDataset(Dataset):
    def __init__(self, csv_file, transform=None, infer=False):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.infer = infer

    def __len__(self):
        return len(self.data)

    # 이미지 전처리
    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 1]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.infer:
            if self.transform:
                image = self.transform(image=image)['image']
            return image

        mask_rle = self.data.iloc[idx, 2]
        mask = rle_decode(mask_rle, (image.shape[0], image.shape[1]))

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")
path = "/content/gdrive/My Drive/ai_dataset"

file_list = os.listdir(path)
file_list_py = [file for file in file_list]

Mounted at /content/gdrive


In [None]:
transform = A.Compose(
    [
        A.Resize(224, 224),
        # 이미지 필터링 및 블러링
        # 밝기와 대비 조절(히스토그램 평활화, 관심 영역 강조, 이피라투 아웃포 포화)
        # 배경 제거(thresholding)
        A.Normalize(),
        ToTensorV2()
    ] , bbox_params=A.BboxParams(format='pascal_voc')
)

transformed = transform(image=image, bboxes=bbox)
transformed_image = transformed['image']
transformed_bboxes = transformed['bboxes']

dataset = SatelliteDataset(csv_file='/content/gdrive/MyDrive/ai_dataset/train.csv', transform=transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=4)

NameError: ignored

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        return x

class ResUNet(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(ResUNet, self).__init__()
        self.down1 = ConvBlock(in_channels, 64)
        self.down2 = ConvBlock(64, 128)
        self.down3 = ConvBlock(128, 256)
        self.down4 = ConvBlock(256, 512)

        self.center = ConvBlock(512, 1024)

        self.up4 = ConvBlock(1024 + 512, 512)
        self.up3 = ConvBlock(512 + 256, 256)
        self.up2 = ConvBlock(256 + 128, 128)
        self.up1 = ConvBlock(128 + 64, 64)

        self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1)


    def forward(self, x):
        conv1 = self.down1(x)
        x = F.max_pool2d(conv1, kernel_size=2, stride=2)

        conv2 = self.down2(x)
        x = F.max_pool2d(conv2, kernel_size=2, stride=2)

        conv3 = self.down3(x)
        x = F.max_pool2d(conv3, kernel_size=2, stride=2)

        conv4 = self.down4(x)
        x = F.max_pool2d(conv4, kernel_size=2, stride=2)

        x = self.center(x)

        x = F.interpolate(x, scale_factor=2, mode='bilinear', align_corners=False)
        x = torch.cat([x, conv4], dim=1)
        x = self.up4(x)

        x = F.interpolate(x, scale_factor=2, mode='bilinear', align_corners=False)
        x = torch.cat([x, conv3], dim=1)
        x = self.up3(x)

        x = F.interpolate(x, scale_factor=2, mode='bilinear', align_corners=False)
        x = torch.cat([x, conv2], dim=1)
        x = self.up2(x)

        x = F.interpolate(x, scale_factor=2, mode='bilinear', align_corners=False)
        x = torch.cat([x, conv1], dim=1)
        x = self.up1(x)

        x = self.final_conv(x)
        # x = F.softmax(x, dim=1)

        return x


In [None]:
# model 초기화
in_channels = 3
num_classes = 1
model = ResUNet(in_channels, num_classes).to(device)

# loss function과 optimizer 정의
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# training loop
for epoch in range(10):  # 10 에폭 동안 학습합니다.
    model.train()
    epoch_loss = 0
    for images, masks in tqdm(dataloader):
        images = images.float().to(device)
        masks = masks.float().to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.unsqueeze(1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f'Epoch {epoch+1}, Loss: {epoch_loss/len(dataloader)}')

100%|██████████| 447/447 [17:29<00:00,  2.35s/it]


Epoch 1, Loss: 0.14971698553903523


100%|██████████| 447/447 [10:38<00:00,  1.43s/it]


Epoch 2, Loss: 0.09958291995512025


100%|██████████| 447/447 [10:13<00:00,  1.37s/it]


Epoch 3, Loss: 0.0871790020041151


100%|██████████| 447/447 [10:20<00:00,  1.39s/it]


Epoch 4, Loss: 0.07948793847555549


100%|██████████| 447/447 [09:47<00:00,  1.31s/it]


Epoch 5, Loss: 0.07532418246737263


100%|██████████| 447/447 [10:22<00:00,  1.39s/it]


Epoch 6, Loss: 0.07080489824582266


100%|██████████| 447/447 [09:41<00:00,  1.30s/it]


Epoch 7, Loss: 0.06758270011818916


 57%|█████▋    | 256/447 [2:19:49<1:44:19, 32.77s/it]


KeyboardInterrupt: ignored

In [None]:
# transform 객체에 전처리 연산하여 전달
test_dataset = SatelliteDataset(csv_file='/content/gdrive/MyDrive/ai_dataset/test.csv', transform=transform, infer=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [None]:
with torch.no_grad():
    model.eval()
    result = []
    for images in tqdm(test_dataloader):
        images = images.float().to(device)

        outputs = model(images)
        masks = torch.sigmoid(outputs).cpu().numpy()
        masks = np.squeeze(masks, axis=1)
        masks = (masks > 0.35).astype(np.uint8) # Threshold = 0.35

        for i in range(len(images)):
            mask_rle = rle_encode(masks[i])
            if mask_rle == '': # 예측된 건물 픽셀이 아예 없는 경우 -1
                result.append(-1)
            else:
                result.append(mask_rle)

In [None]:
submit = pd.read_csv('/content/gdrive/MyDrive/ai_dataset/sample_submission.csv')
submit['mask_rle'] = result

In [None]:
submit.to_csv('/content/gdrive/MyDrive/ai_dataset/submit.csv', index=False)