# [모의경진대회] 토지피복지도 객체분할

## 필수 라이브러리 불러오기

In [1]:
import os
import numpy as np
import pandas as pd
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm
from sklearn.model_selection import train_test_split

import segmentation_models_pytorch as smp
from segmentation_models_pytorch.losses import DiceLoss

import albumentations as A
import cv2
from datetime import datetime, timezone, timedelta

## CONFIG 설정

### 데이터 경로

In [2]:
# 프로젝트 경로
PROJECT_DIR = '/workspace/Competition/map_segmentation'
os.chdir(PROJECT_DIR)

#데이터 경로
DATA_DIR = os.path.join(PROJECT_DIR, 'data','final') # 모든 데이터가 들어있는 폴더 경로
TRAIN_DIR = os.path.join(DATA_DIR, 'train') # 학습 데이터가 들어있는 폴더 경로
TRAIN_IMG_DIR = os.path.join(TRAIN_DIR, 'images') # 학습 이미지가 들어있는 폴더 경로
TRAIN_MASK_DIR = os.path.join(TRAIN_DIR, 'masks') # 학습 마스크가 들어있는 폴더 경로
TRAIN_CSV_FILE = os.path.join(TRAIN_DIR, 'traindf.csv') # 학습 이미지와 마스크 이름이 들어있는 CSV 경로

### 결과 저장 경로

In [3]:
# 시간 고유값 
kst = timezone(timedelta(hours=9))        
train_serial = datetime.now(tz=kst).strftime("%Y%m%d_%H%M%S")

# 기록 경로
RECORDER_DIR = os.path.join(PROJECT_DIR, 'results', 'train', train_serial)
# 현재 시간 기준 폴더 생성
os.makedirs(RECORDER_DIR, exist_ok=True)    

### 시드 설정

In [4]:
RANDOM_SEED = 2022 #랜덤 시드

torch.manual_seed(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

### 디바이스 설정

In [5]:
os.environ['CUDA_VISIBLE_DEVICES']="0"
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### Set hyperparameters

In [6]:
EPOCHS = 10
BATCH_SIZE = 8
LEARNING_RATE = 0.003
EARLY_STOPPING_PATIENCE = 10
IMG_SIZE = 512

ENCODER = 'timm-efficientnet-b0' # 활용할 인코더 모델
WEIGHTS = 'imagenet' # Pre-train에 활용된 데이터셋

## Dataset 정의

In [7]:
class SegDataset(Dataset):
    def __init__(self, df, augmentations, img_dir, mask_dir):
        self.df = df # 이미지와 마스크 이름이 저장된 데이터프레임 
        self.augmentations = augmentations # 학습 전 적용할 augmentation
        self.img_dir = img_dir # 이미지 폴더 경로
        self.mask_dir = mask_dir # 마스크 폴더 경로
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # 데이터 프레임 불러와서 이미지와 마스크 경로 설정
        row = self.df.iloc[idx] # 데이터프레임 행 불러오기
        image_path = os.path.join(self.img_dir,row['img'])
        mask_path = os.path.join(self.mask_dir, row['mask'])
        
        # 이미지와 마스크 불러오기
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = np.expand_dims(mask, axis=-1)
        
        # Augmentation 적용하기
        if self.augmentations:
            data = self.augmentations(image=image, mask=mask)
            image = data['image']
            mask = data['mask']
        
        # PyTorch 인풋 모양에 맞게 이미지와 마스크 모양 변경
        image = np.transpose(image, (2,0,1)).astype(np.float32)
        mask = np.transpose(mask, (2,0,1)).astype(np.float32)
        
        # 이미지 Normalization 0~255 픽셀값 --> 0~1 픽셀값
        image = torch.Tensor(image) / 255.0
        mask = torch.round(torch.Tensor(mask)/255.0)
        
        return image, mask

## 모델 정의

In [8]:
class SegModel(nn.Module):
    def __init__(self):
        super(SegModel, self).__init__()
        
        # Pre-train된 UNET 불러오기
        self.backbone = smp.Unet(
            encoder_name = ENCODER, # 인코더 모델 설정
            encoder_weights = WEIGHTS, # 사전학습 데이터셋 설정
            in_channels = 3, # 이미지 디멘션 (3 * 512 * 512)
            classes = 1, # 세그멘테이션 클래스 개수 
            activation = None # logit 값 불러오기
        )
        
    def forward(self, images):
        logits = self.backbone(images)
        
        return logits

## Utils 정의

### Augmentation 함수

In [9]:
def get_train_augs():
    return A.Compose([
        A.Resize(IMG_SIZE, IMG_SIZE), # 이미지 크기 변환
        A.HorizontalFlip(p=0.5), # 이미지 좌우반전
        A.VerticalFlip(p=0.5) # 이미지 상하반전
    ])

def get_valid_augs():
    return A.Compose([
        A.Resize(IMG_SIZE, IMG_SIZE)
    ])

### Train 함수

In [10]:
def train_fn(dataloader, model, optimizer, loss_fn):
    model.train()
    
    total_loss = 0.0
    
    for images,masks in tqdm(dataloader):
        images = images.to(DEVICE)
        masks = masks.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(images)
        loss = loss_fn(logits, masks)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    return total_loss/len(dataloader)

### Validation 함수

In [11]:
def valid_fn(dataloader, model, loss_fn):
    model.eval()
    
    total_loss = 0.0
    
    with torch.no_grad():
        for images,masks in tqdm(dataloader):
            images = images.to(DEVICE)
            masks = masks.to(DEVICE)
            logits = model(images)
            loss = loss_fn(logits, masks)
            total_loss += loss.item()
    return total_loss/len(dataloader)

## 모델 학습

### Dataset & Dataloader 설정

In [12]:
# 학습 이미지, 마스크 이름 들어있는 CSV 불러와 데이터 프레임으로 저장
entiredf = pd.read_csv(TRAIN_CSV_FILE)

# Train과 Validation 데이터셋으로 나누기
traindf, validdf = train_test_split(entiredf, test_size=0.2)
traindf = traindf.reset_index(drop=True)
validdf = validdf.reset_index(drop=True)

# Dataset 및 Dataloader 설정
train_dataset = SegDataset(traindf, get_train_augs(), TRAIN_IMG_DIR, TRAIN_MASK_DIR)
valid_dataset = SegDataset(validdf, get_valid_augs(), TRAIN_IMG_DIR, TRAIN_MASK_DIR)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size = BATCH_SIZE)

### 모델, Loss, Optimizer 설정

In [13]:
model = SegModel().to(DEVICE) # 모델 설정
loss_fn = DiceLoss(mode = 'binary') # 학습 loss funciton 설정
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE) # optimizer 설정

### Epoch 단위 학습 진행

In [14]:
best_loss = np.Inf

for i in range(EPOCHS):
    train_loss = train_fn(train_loader, model, optimizer, loss_fn)
    valid_loss = valid_fn(valid_loader, model, loss_fn)
    
    # loss가 감소하면 모델 저장
    if valid_loss < best_loss:
        torch.save(model.state_dict(), os.path.join(RECORDER_DIR, "best-model.pt"))
        print('saved model')
        best_loss = valid_loss
        print(f"Epoch: {i+1}, Train Loss: {train_loss} Valid Loss: {valid_loss}")

100%|██████████| 842/842 [04:32<00:00,  3.09it/s]
100%|██████████| 211/211 [00:34<00:00,  6.11it/s]


saved model
Epoch: 1, Train Loss: 0.24479317643863288 Valid Loss: 0.1882980796398145


100%|██████████| 842/842 [04:31<00:00,  3.11it/s]
100%|██████████| 211/211 [00:34<00:00,  6.09it/s]


saved model
Epoch: 2, Train Loss: 0.18856132915354115 Valid Loss: 0.16962257465480063


100%|██████████| 842/842 [04:32<00:00,  3.09it/s]
100%|██████████| 211/211 [00:34<00:00,  6.12it/s]
100%|██████████| 842/842 [04:30<00:00,  3.11it/s]
100%|██████████| 211/211 [00:33<00:00,  6.26it/s]


saved model
Epoch: 4, Train Loss: 0.16546692184201328 Valid Loss: 0.16082554292904822


100%|██████████| 842/842 [04:30<00:00,  3.11it/s]
100%|██████████| 211/211 [00:34<00:00,  6.12it/s]


saved model
Epoch: 5, Train Loss: 0.16125684176657942 Valid Loss: 0.15086359576591382


100%|██████████| 842/842 [04:31<00:00,  3.10it/s]
100%|██████████| 211/211 [00:34<00:00,  6.10it/s]
100%|██████████| 842/842 [04:32<00:00,  3.09it/s]
100%|██████████| 211/211 [00:34<00:00,  6.12it/s]
100%|██████████| 842/842 [04:30<00:00,  3.11it/s]
100%|██████████| 211/211 [00:35<00:00,  6.00it/s]


saved model
Epoch: 8, Train Loss: 0.1540904399051802 Valid Loss: 0.13840618037499522


100%|██████████| 842/842 [04:31<00:00,  3.11it/s]
100%|██████████| 211/211 [00:34<00:00,  6.13it/s]
100%|██████████| 842/842 [04:30<00:00,  3.11it/s]
100%|██████████| 211/211 [00:33<00:00,  6.22it/s]

saved model
Epoch: 10, Train Loss: 0.14830893936463035 Valid Loss: 0.13805741128198343





## 추론

### 마스크를 RLE형태로 변환하는 함수

In [1]:
def mask_to_rle(mask):
    flatten_mask = mask.flatten()
    if flatten_mask.max() == 0:
        return f'0 {len(flatten_mask)}'
    idx = np.where(flatten_mask!=0)[0]
    steps = idx[1:]-idx[:-1]
    new_coord = []
    step_idx = np.where(np.array(steps)!=1)[0]
    start = np.append(idx[0], idx[step_idx+1])
    end = np.append(idx[step_idx], idx[-1])
    length = end - start + 1
    for i in range(len(start)):
        new_coord.append(start[i])
        new_coord.append(length[i])
    new_coord_str = ' '.join(map(str, new_coord))
    return new_coord_str

### Test Dataset 정의

In [2]:
class TestDataset(Dataset):
    def __init__(self, df, img_dir):
        self.df = df
        self.img_dir = img_dir
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        imname = row['img']
        image_path = os.path.join(self.img_dir,imname)
        
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = np.transpose(image, (2,0,1)).astype(np.float32)
        image = torch.Tensor(image) / 255.0
        
        return image,imname

NameError: name 'Dataset' is not defined

### 경로 설정 및 Test 데이터 부러오기

In [None]:
TEST_DIR = os.path.join(DATA_DIR, 'test') # 테스트 데이터가 들어있는 폴더 경로
TEST_IMG_DIR = os.path.join(TEST_DIR, 'images') # 테스트 이미지가 들어있는 폴더 경로
TEST_CSV_FILE = os.path.join(TEST_DIR, 'testdf.csv') # 테스트 이미지 이름이 들어있는 CSV 경로
testdf = pd.read_csv(TEST_CSV_FILE)
test_dataset = TestDataset(testdf, TEST_IMG_DIR)
test_loader = DataLoader(dataset=test_dataset, batch_size=1,shuffle=False)

### 최고 성능 모델 가중치 불러오기

In [None]:
model.load_state_dict(torch.load(os.path.join(RECORDER_DIR, 'best-model.pt')))

### 추론 진행

In [None]:
file_list = [] # 이미지 이름 저장할 리스트
pred_list = [] # 마스크 저장할 리스트
class_list = [] # 클래스 이름 저장할 리스트 ('building')

model.eval()
with torch.no_grad():
    for batch_index, (image,imname) in tqdm(enumerate(test_loader)):
        image = image.to(DEVICE)
        logit_mask = model(image)
        pred_mask = torch.sigmoid(logit_mask) # logit 값을 probability score로 변경
        pred_mask = (pred_mask > 0.5) * 1.0 # 0.5 이상 확률 가진 픽셀값 1로 변환
        pred_rle = mask_to_rle(pred_mask.detach().cpu().squeeze(0)) # 마스크를 RLE 형태로 변경
        pred_list.append(pred_rle)
        file_list.append(imname[0])
        class_list.append("building")

### 예측 결과 파일 만들기

In [None]:
# 예측 결과 데이터프레임 만들기
results = pd.DataFrame({'img_id':file_list,'class':class_list,'prediction':pred_list})

# sample_submission.csv와 같은 형태로 변형
sampledf = pd.read_csv(os.path.join(TEST_DIR, 'sample_submission.csv'))
sorter = list(sampledf['img_id'])
results = results.set_index('img_id')
results = results.loc[sorter].reset_index()
                       
# 결과 저장
results.to_csv(os.path.join(RECORDER_DIR, 'prediction.csv'), index=False)