# [모의 경진대회] 토지피복지도 객체 분할

* 이미지 세그멘테이션
* 담당: 박성호 M

## 데이터 디렉토리 구조

In [None]:
DATA/
  \_train/
       \_traindf.csv  
       \_images/
           \_xxx.png
           \_yyy.png
           \_zzz.png
           \_...  
       \_masks/
           \_xxx.png
           \_yyy.png
           \_zzz.png
           \_...
  \_test/
       \_sample_submission.csv
       \_testdf.csv
       \_images/
           \_aaa.png  
           \_bbb.png  
           \_...  

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#!unzip /content/drive/MyDrive/landmap/DATA/train.zip -d /content/drive/MyDrive/landmap/DATA

In [None]:
#!unzip /content/drive/MyDrive/landmap/DATA/test.zip -d /content/drive/MyDrive/landmap/DATA

## 필수 라이브러리 불러오기

In [2]:
!pip install segmentation_models_pytorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting segmentation_models_pytorch
  Downloading segmentation_models_pytorch-0.3.0-py3-none-any.whl (97 kB)
[K     |████████████████████████████████| 97 kB 7.2 MB/s 
Collecting timm==0.4.12
  Downloading timm-0.4.12-py3-none-any.whl (376 kB)
[K     |████████████████████████████████| 376 kB 60.9 MB/s 
Collecting efficientnet-pytorch==0.7.1
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
Collecting pretrainedmodels==0.7.4
  Downloading pretrainedmodels-0.7.4.tar.gz (58 kB)
[K     |████████████████████████████████| 58 kB 6.6 MB/s 
Collecting munch
  Downloading munch-2.5.0-py2.py3-none-any.whl (10 kB)
Building wheels for collected packages: efficientnet-pytorch, pretrainedmodels
  Building wheel for efficientnet-pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for efficientnet-pytorch: filename=efficientnet_pytorch-0.7.1-py3-none-any.whl size=16446 sha256=3c2dac4313f241

In [3]:
import os
import numpy as np
import pandas as pd
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm
from sklearn.model_selection import train_test_split

import segmentation_models_pytorch as smp
from segmentation_models_pytorch.losses import DiceLoss

import albumentations as A
import cv2
from datetime import datetime, timezone, timedelta

## 하이퍼파라미터 및 기타 인자 설정

#### 데이터 경로

In [4]:
# 프로젝트 경로
PROJECT_DIR = '/content/drive/MyDrive/landmap'
os.chdir(PROJECT_DIR)

#데이터 경로
DATA_DIR = os.path.join(PROJECT_DIR, 'DATA') # 모든 데이터가 들어있는 폴더 경로
TRAIN_DIR = os.path.join(DATA_DIR, 'train') # 학습 데이터가 들어있는 폴더 경로
TRAIN_IMG_DIR = os.path.join(TRAIN_DIR, 'images') # 학습 이미지가 들어있는 폴더 경로
TRAIN_MASK_DIR = os.path.join(TRAIN_DIR, 'masks') # 학습 마스크가 들어있는 폴더 경로
TRAIN_CSV_FILE = os.path.join(TRAIN_DIR, 'traindf.csv') # 학습 이미지와 마스크 이름이 들어있는 CSV 경로

### 데이터 수량 확인:
- n_train = 3930
- n_test = 3930

In [5]:
len(os.listdir(TRAIN_IMG_DIR)) #3930

3930

In [6]:
len(os.listdir(TRAIN_MASK_DIR)) #3930

3930

### 결과 저장 경로 설정

In [7]:
# 시간 고유값 
kst = timezone(timedelta(hours=9))        
train_serial = datetime.now(tz=kst).strftime("%Y%m%d_%H%M%S")

# 기록 경로
RECORDER_DIR = os.path.join(PROJECT_DIR, 'results', 'train', train_serial)
# 현재 시간 기준 폴더 생성
os.makedirs(RECORDER_DIR, exist_ok=True)    

#### 시드 설정

In [8]:
RANDOM_SEED = 2022 #랜덤 시드

torch.manual_seed(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

#### 디바이스 설정

In [9]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#### 하이퍼파라미터 설정

In [10]:
EPOCHS = 1
BATCH_SIZE = 8
LEARNING_RATE = 0.003
EARLY_STOPPING_PATIENCE = 10
IMG_SIZE = 512

ENCODER = 'timm-efficientnet-b0' # 활용할 인코더 모델
WEIGHTS = 'imagenet' # Pre-train에 활용된 데이터셋

## Dataset 정의

In [12]:
class SegDataset(Dataset):
    def __init__(self, df, augmentations, img_dir, mask_dir):
        self.df = df # 이미지와 마스크 이름이 저장된 데이터프레임 
        self.augmentations = augmentations # 학습 전 적용할 augmentation
        self.img_dir = img_dir # 이미지 폴더 경로
        self.mask_dir = mask_dir # 마스크 폴더 경로
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # 데이터 프레임 불러와서 이미지와 마스크 경로 설정
        row = self.df.iloc[idx] # 데이터프레임 행 불러오기
        image_path = os.path.join(self.img_dir,row['img'])
        mask_path = os.path.join(self.mask_dir, row['mask'])
        
        # 이미지와 마스크 불러오기
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = np.expand_dims(mask, axis=-1)
        
        # Augmentation 적용하기
        if self.augmentations:
            data = self.augmentations(image=image, mask=mask)
            image = data['image']
            mask = data['mask']
        
        # PyTorch 인풋 모양에 맞게 이미지와 마스크 모양 변경
        image = np.transpose(image, (2,0,1)).astype(np.float32)
        mask = np.transpose(mask, (2,0,1)).astype(np.float32)
        
        # 이미지 Normalization 0~255 픽셀값 --> 0~1 픽셀값
        image = torch.Tensor(image) / 255.0
        mask = torch.round(torch.Tensor(mask)/255.0)
        
        return image, mask

## 모델 정의

In [11]:
class SegModel(nn.Module):
    def __init__(self):
        super(SegModel, self).__init__()
        
        # Pre-train된 UNET 불러오기
        self.backbone = smp.Unet(
            encoder_name = ENCODER, # 인코더 모델 설정
            encoder_weights = WEIGHTS, # 사전학습 데이터셋 설정
            in_channels = 3, # 이미지 디멘션 (3 * 512 * 512)
            classes = 1, # 세그멘테이션 클래스 개수 
            activation = None # logit 값 불러오기
        )
        
    def forward(self, images):
        logits = self.backbone(images)
        
        return logits

## Utils 정의
#### Augmentation 함수

In [13]:
def get_train_augs():
    return A.Compose([
        A.Resize(IMG_SIZE, IMG_SIZE), # 이미지 크기 변환
        A.HorizontalFlip(p=0.5), # 이미지 좌우반전
        A.VerticalFlip(p=0.5) # 이미지 상하반전
    ])

def get_valid_augs():
    return A.Compose([
        A.Resize(IMG_SIZE, IMG_SIZE)
    ])

#### Train 함수

In [14]:
def train_fn(dataloader, model, optimizer, loss_fn):
    model.train()
    
    total_loss = 0.0
    
    for images,masks in tqdm(dataloader):
        images = images.to(DEVICE)
        masks = masks.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(images)
        loss = loss_fn(logits, masks)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    return total_loss/len(dataloader)

#### Validation 함수

In [15]:
def valid_fn(dataloader, model, loss_fn):
    model.eval()
    
    total_loss = 0.0
    
    with torch.no_grad():
        for images,masks in tqdm(dataloader):
            images = images.to(DEVICE)
            masks = masks.to(DEVICE)
            logits = model(images)
            loss = loss_fn(logits, masks)
            total_loss += loss.item()
    return total_loss/len(dataloader)

## 모델 학습
#### Dataset & Dataloader 설정

In [16]:
# 학습 이미지, 마스크 이름 들어있는 CSV 불러와 데이터 프레임으로 저장
entiredf = pd.read_csv(TRAIN_CSV_FILE)

# Train과 Validation 데이터셋으로 나누기
traindf, validdf = train_test_split(entiredf, test_size=0.2)
traindf = traindf.reset_index(drop=True)
validdf = validdf.reset_index(drop=True)

# Dataset 및 Dataloader 설정
train_dataset = SegDataset(traindf, get_train_augs(), TRAIN_IMG_DIR, TRAIN_MASK_DIR)
valid_dataset = SegDataset(validdf, get_valid_augs(), TRAIN_IMG_DIR, TRAIN_MASK_DIR)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size = BATCH_SIZE)

#### 모델과 기타 utils 설정

In [17]:
model = SegModel().to(DEVICE) # 모델 설정
loss_fn = DiceLoss(mode = 'binary') # 학습 loss funciton 설정
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE) # optimizer 설정

Downloading: "https://github.com/rwightman/pytorch-image-models/releases/download/v0.1-weights/tf_efficientnet_b0_aa-827b6e33.pth" to /root/.cache/torch/hub/checkpoints/tf_efficientnet_b0_aa-827b6e33.pth


  0%|          | 0.00/20.4M [00:00<?, ?B/s]

#### Epoch 단위 학습 진행

In [18]:
best_loss = np.Inf

for i in range(EPOCHS):
    train_loss = train_fn(train_loader, model, optimizer, loss_fn)
    valid_loss = valid_fn(valid_loader, model, loss_fn)
    
    # loss가 감소하면 모델 저장
    if valid_loss < best_loss:
        torch.save(model.state_dict(), os.path.join(RECORDER_DIR, "best-model.pt"))
        print('saved model')
        best_loss = valid_loss
        print(f"Epoch: {i+1}, Train Loss: {train_loss} Valid Loss: {valid_loss}")

100%|██████████| 393/393 [26:31<00:00,  4.05s/it]
100%|██████████| 99/99 [05:35<00:00,  3.39s/it]

saved model
Epoch: 1, Train Loss: 0.29776889904764775 Valid Loss: 0.24752458117224954





## 추론

#### 마스크를 RLE 형태로 변환해주는 함수

In [None]:
def mask_to_rle(mask):
    flatten_mask = mask.flatten()
    if flatten_mask.max() == 0:
        return f'0 {len(flatten_mask)}'
    idx = np.where(flatten_mask!=0)[0]
    steps = idx[1:]-idx[:-1]
    new_coord = []
    step_idx = np.where(np.array(steps)!=1)[0]
    start = np.append(idx[0], idx[step_idx+1])
    end = np.append(idx[step_idx], idx[-1])
    length = end - start + 1
    for i in range(len(start)):
        new_coord.append(start[i])
        new_coord.append(length[i])
    new_coord_str = ' '.join(map(str, new_coord))
    return new_coord_str

#### Test 데이터셋 불러오기

In [None]:
class TestDataset(Dataset):
    def __init__(self, df, img_dir):
        self.df = df
        self.img_dir = img_dir
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        imname = row['img']
        image_path = os.path.join(self.img_dir,imname)
        
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = np.transpose(image, (2,0,1)).astype(np.float32)
        image = torch.Tensor(image) / 255.0
        
        return image,imname

#### 경로 및 기타 인자 설정

In [None]:
TEST_DIR = os.path.join(DATA_DIR, 'test') # 테스트 데이터가 들어있는 폴더 경로
TEST_IMG_DIR = os.path.join(TEST_DIR, 'images') # 테스트 이미지가 들어있는 폴더 경로
TEST_CSV_FILE = os.path.join(TEST_DIR, 'testdf.csv') # 테스트 이미지 이름이 들어있는 CSV 경로

#### 테스트 Dataset, DataLoader 설정

In [None]:
testdf = pd.read_csv(TEST_CSV_FILE)
test_dataset = TestDataset(testdf, TEST_IMG_DIR)
test_loader = DataLoader(dataset=test_dataset, batch_size=1,shuffle=False)

#### 최고 성능 모델 불러오기

In [None]:
model.load_state_dict(torch.load(os.path.join(RECORDER_DIR, 'best-model.pt')))

#### 추론 진행

In [None]:
file_list = [] # 이미지 이름 저장할 리스트
pred_list = [] # 마스크 저장할 리스트
class_list = [] # 클래스 이름 저장할 리스트 ('building')

model.eval()
with torch.no_grad():
    for batch_index, (image,imname) in tqdm(enumerate(test_loader)):
        image = image.to(DEVICE)
        logit_mask = model(image)
        pred_mask = torch.sigmoid(logit_mask) # logit 값을 probability score로 변경
        pred_mask = (pred_mask > 0.5) * 1.0 # 0.5 이상 확률 가진 픽셀값 1로 변환
        pred_rle = mask_to_rle(pred_mask.detach().cpu().squeeze(0)) # 마스크를 RLE 형태로 변경
        pred_list.append(pred_rle)
        file_list.append(imname[0])
        class_list.append("building")
        

#### 예측 결과 파일 만들기

In [None]:
# 예측 결과 데이터프레임 만들기
results = pd.DataFrame({'img_id':file_list,'class':class_list,'prediction':pred_list})

# sample_submission.csv와 같은 형태로 변형
sampledf = pd.read_csv(os.path.join(TEST_DIR, 'sample_submission.csv'))
sorter = list(sampledf['img_id'])
results = results.set_index('img_id')
results = results.loc[sorter].reset_index()
                       
# 결과 저장
results.to_csv(os.path.join(RECORDER_DIR, 'prediction.csv'), index=False)