In [None]:
import os
import cv2
import pandas as pd
import numpy as np

#import torchmetrics
#import torchgeometry
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2

from efficientnet_pytorch import EfficientNet
from keras.applications.vgg16 import VGG16
from keras.applications.resnet50 import ResNet50

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)


In [None]:
from albumentations import (
    HorizontalFlip, IAAPerspective, ShiftScaleRotate, CLAHE, RandomRotate90,
    Transpose, ShiftScaleRotate, Blur, OpticalDistortion, GridDistortion, HueSaturationValue,
    IAAAdditiveGaussianNoise, GaussNoise, MotionBlur, MedianBlur, IAAPiecewiseAffine,
    IAASharpen, IAAEmboss, RandomBrightnessContrast, Flip, OneOf, Compose, Normalize, Cutout, CoarseDropout,
    ShiftScaleRotate, CenterCrop, Resize
)
from albumentations.pytorch import ToTensorV2

In [None]:
# RLE 디코딩 함수
def rle_decode(mask_rle, shape):
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape)

# RLE 인코딩 함수
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

In [None]:
class SatelliteDataset(Dataset):
    def __init__(self, csv_file, transform=None, infer=False):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.infer = infer

        # transforms
        self.transform = transform
        self.default_transform = Compose([
            Resize(224, 224),
            Normalize(),
            ToTensorV2()
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 1]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.infer:
            if self.transform:
                image = self.transform(image=image)['image']
            else:
                image = self.default_transform(image=image)['image']
            return image

        mask_rle = self.data.iloc[idx, 2]
        mask = rle_decode(mask_rle, (image.shape[0], image.shape[1]))

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        else:
            augmented = self.default_transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask

In [None]:
aug_transform = Compose([
        Resize(224, 224),
        HorizontalFlip(p=0.5), # 50% 확률로 이미지를 수평으로 뒤집음
        A.VerticalFlip(p=0.5), # 50% 확률로 이미지를 수직으로 뒤집음
        A.Rotate(limit=30), # -30도에서 30도 사이의 각도로 이미지를 무작위로 회전
        RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5), # 컬러 변형
        Normalize(), # 이미지를 정규화
        ToTensorV2() # PyTorch tensor로 변환
    ]
)

dataset = SatelliteDataset(csv_file='./train.csv', transform=aug_transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=6)

In [None]:
# U-Net의 기본 구성 요소인 Double Convolution Block을 정의합니다.
def CBR2d(in_channels, out_channels,kernel_size=3, stride=1, padding=1, bias=True):
        layers = []
        layers += [nn.Conv2d(in_channels=in_channels, out_channels=out_channels,kernel_size=kernel_size, stride=stride, padding=padding,bias=bias)]
        layers += [nn.BatchNorm2d(num_features=out_channels)]
        layers += [nn.ReLU()]

        cbr = nn.Sequential(*layers)

        return cbr

    
class UNet(nn.Module):
    def __init__(self, backbone_name='ResNet50', classes=1, encoder_weights='imagenet'):
        super(UNet, self).__init__()

        self.backbone = ResNet50

        self.enc1_1 = CBR2d(in_channels=3, out_channels=64)
        self.enc1_2 = CBR2d(in_channels=64, out_channels=64)

        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.enc2_1 = CBR2d(in_channels=64, out_channels=128)
        self.enc2_2 = CBR2d(in_channels=128, out_channels=128)

        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.enc3_1 = CBR2d(in_channels=128, out_channels=256)
        self.enc3_2 = CBR2d(in_channels=256, out_channels=256)

        self.pool3 = nn.MaxPool2d(kernel_size=2)

        self.enc4_1 = CBR2d(in_channels=256, out_channels=512)
        self.enc4_2 = CBR2d(in_channels=512, out_channels=512)

        self.pool4 = nn.MaxPool2d(kernel_size=2)

        self.enc5_1 = CBR2d(in_channels=512, out_channels=1024)

        # 확장 경로(Expansive path)
        self.dec5_1 = CBR2d(in_channels=1024, out_channels=512)

        self.unpool4 = nn.ConvTranspose2d(in_channels=512, out_channels=512,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec4_2 = CBR2d(in_channels=2 * 512, out_channels=512)
        self.dec4_1 = CBR2d(in_channels=512, out_channels=256)

        self.unpool3 = nn.ConvTranspose2d(in_channels=256, out_channels=256,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec3_2 = CBR2d(in_channels=2 * 256, out_channels=256)
        self.dec3_1 = CBR2d(in_channels=256, out_channels=128)

        self.unpool2 = nn.ConvTranspose2d(in_channels=128, out_channels=128,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec2_2 = CBR2d(in_channels=2 * 128, out_channels=128)
        self.dec2_1 = CBR2d(in_channels=128, out_channels=64)

        self.unpool1 = nn.ConvTranspose2d(in_channels=64, out_channels=64,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec1_2 = CBR2d(in_channels=2 * 64, out_channels=64)
        self.dec1_1 = CBR2d(in_channels=64, out_channels=64)

        self.fc = nn.Conv2d(in_channels=64, out_channels=1, kernel_size=1, stride=1, padding=0, bias=True)
    def forward(self, x):
        # Backbone feature extraction
        #backbone_features = self.backbone.extract_features(x)

        enc1_1 = self.enc1_1(x)
        enc1_2 = self.enc1_2(enc1_1)
        pool1 = self.pool1(enc1_2)

        enc2_1 = self.enc2_1(pool1)
        enc2_2 = self.enc2_2(enc2_1)
        pool2 = self.pool2(enc2_2)

        enc3_1 = self.enc3_1(pool2)
        enc3_2 = self.enc3_2(enc3_1)
        pool3 = self.pool3(enc3_2)

        enc4_1 = self.enc4_1(pool3)
        enc4_2 = self.enc4_2(enc4_1)
        pool4 = self.pool4(enc4_2)

        enc5_1 = self.enc5_1(pool4)

        dec5_1 = self.dec5_1(enc5_1)

        unpool4 = self.unpool4(dec5_1)
        cat4 = torch.cat((unpool4, enc4_2), dim=1)
        dec4_2 = self.dec4_2(cat4)
        dec4_1 = self.dec4_1(dec4_2)

        unpool3 = self.unpool3(dec4_1)
        cat3 = torch.cat((unpool3, enc3_2), dim=1)
        dec3_2 = self.dec3_2(cat3)
        dec3_1 = self.dec3_1(dec3_2)

        unpool2 = self.unpool2(dec3_1)
        cat2 = torch.cat((unpool2, enc2_2), dim=1)
        dec2_2 = self.dec2_2(cat2)
        dec2_1 = self.dec2_1(dec2_2)

        unpool1 = self.unpool1(dec2_1)
        cat1 = torch.cat((unpool1, enc1_2), dim=1)
        dec1_2 = self.dec1_2(cat1)
        dec1_1 = self.dec1_1(dec1_2)

        x = self.fc(dec1_1)

        return x
        
        


In [None]:
model = UNet().to(device)

model = UNet(backbone_name='ResNet50', encoder_weights='imagenet').to(device)

# # loss function과 optimizer 정의
criterion = torch.nn.BCEWithLogitsLoss().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)


In [None]:

# training loop
for epoch in range(15):
    model.train()
    epoch_loss = 0
    for images, masks in tqdm(dataloader):
        images = images.float().to(device)
        masks = masks.float().to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.unsqueeze(1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    model_name = "resnet50_conv"+str(epoch)+".pth"
    torch.save(model.state_dict(), "./" + model_name)
    print(f'Epoch {epoch+1}, Loss: {epoch_loss/len(dataloader)}')


## **픽셀의 숫자가 특정 개수 이하면 -1**

In [None]:
model_name = "resnet50_conv.pth"
torch.save(model.state_dict(), "./" + model_name)

**컨투어 검출후 에러 삭제**

In [None]:
# 컨투어 검출하기
def contoursDelet(mask):
  mask = np.array(mask)
  contours, hierarchy = cv2.findContours(mask.astype(np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

  # cv2.drawContours(img_copy, contours, -1, (0, 0, 255), 1)

  # Threshold를 지정하세요.
  threshold = 100

  # Create an empty mask to store the pixels to be removed
  remove_mask = np.zeros_like(mask)

  # 각 컨투어를 순회하며 면적 계산
  for idx, contour in enumerate(contours):
      # 현재 컨투어가 최상위 컨투어이고(부모가 없고), 하위 컨투어(자식이)가 없으면 단일 구조
      if hierarchy[0][idx][3] == -1 and hierarchy[0][idx][2] == -1:
          area = cv2.contourArea(contour)
          #print(area)
          if area < threshold:
              cv2.drawContours(remove_mask, [contour], -1, 255, thickness=cv2.FILLED)

      # 현재 컨투어가 최상위 컨투어이고, 하위 컨투어(자식이)가 있으면 뚫린 구조
      elif hierarchy[0][idx][3] == -1 and hierarchy[0][idx][2] != -1:
          outer_area = cv2.contourArea(contour)
          inner_area = cv2.contourArea(contours[hierarchy[0][idx][2]])
          #print(outer_area - inner_area)
          if (outer_area - inner_area) < threshold:
              cv2.drawContours(remove_mask, [contour], -1, 255, thickness=cv2.FILLED)

  # Remove the small areas from the original mask
  final_mask = cv2.bitwise_and(mask, cv2.bitwise_not(remove_mask))

  return final_mask


In [None]:
transform = A.Compose(
    [
        A.Resize(224, 224),
        A.Normalize(),
        ToTensorV2()
    ]
)

In [None]:
test_dataset = SatelliteDataset(csv_file='./test.csv', transform=transform, infer=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=6)

In [None]:
model.load_state_dict(torch.load('./resnet50_conv.pth', map_location=torch.device('cpu')))
model = model.to(device)

In [None]:
with torch.no_grad():
    model.eval()
    result = []
    for images in tqdm(test_dataloader):
        images = images.float().to(device)
        
        outputs = model(images)
        masks = torch.sigmoid(outputs).cpu().numpy()
        masks = np.squeeze(masks, axis=1)
        masks = (masks > 0.3).astype(np.uint8) # Threshold = 0.35
        
        for i in range(len(images)):
            mask_rle = rle_encode(masks[i])
            if mask_rle == '': # 예측된 건물 픽셀이 아예 없는 경우 -1
                result.append(-1)
            else:
                result.append(mask_rle)

In [None]:
submit = pd.read_csv('./sample_submission.csv')
submit['mask_rle'] = result

In [None]:
submit.to_csv('./submit.csv', index=False)