# Environment
- Ubuntu 24.04
- Python 3.10
- 해당 노트북 파일과 같은 디렉터리 상에 ./Wildfire 폴더에 대회 데이터셋이 존재해야합니다.
  - ./Wildfire/train_img
  - ./Wildfire/train_mask
  - ./Wildfire/test_img

# 주의사항
checkpoint 경로 설정은 제일 아래에 있습니다.

In [1]:
# 필요 패키지 설치 (현시점 (2024-03-27) 최신버전으로 설치시 문제 없음)
! pip install numpy opencv-contrib-python albumentations --upgrade
! pip install torch torchvision openmim --upgrade
! pip install timm --upgrade
! mim install mmengine --upgrade
! pip install rasterio

Looking in links: https://download.openmmlab.com/mmcv/dist/cu121/torch2.2.0/index.html


# Dataset
테스트데이터 로딩을 위한 dataset class입니다.

해당 노트북 파일과 같은 디렉터리 상에 Wildfire 폴더에 대회 데이터셋이 존재해야합니다.

기본적으로 학습데이터셋에서와 같은 방법으로 7채널 + mean 7채널로 인풋 이미지를 출력합니다. (14x256x256)
별도의 어그멘테이션은 없습니다.

In [2]:
from pathlib import Path

import numpy as np
import rasterio
from torch.utils.data import Dataset

DATA_ROOT = Path("Wildfire")
TRAIN_IMG_DIR = DATA_ROOT / "train_img"
TRAIN_MASK_DIR = DATA_ROOT / "train_mask"
TEST_IMG_DIR = DATA_ROOT / "test_img"

def _imread_float(f: str | Path, input_chs: list[int]):
    img = rasterio.open(f).read()[input_chs].transpose((1, 2, 0))
    img = img / 65535
    img = img.astype(np.float32)

    return img


class TestDataset(Dataset):
    def __init__(self, input_chs: list[int]):
        super().__init__()

        img_paths = sorted(TEST_IMG_DIR.glob("*.tif"))

        self.img_paths = img_paths
        self.input_chs = input_chs

    def __getitem__(self, idx):

        img_path = self.img_paths[idx]

        img = _imread_float(img_path, self.input_chs)

        # (H, W, C) -> (C, H, W)
        img = np.transpose(img, (2, 0, 1))

        # Add mean channels
        img_mean = np.zeros_like(img)
        for i, each_ch in enumerate(img):
            if (each_ch > 0).sum() > 0:
                img_mean[i] = each_ch[each_ch > 0].mean()

        img = np.concatenate([img, img_mean], axis=0)

        # sample return
        sample = {"img": img, "img_path": img_path}

        return sample

    def __len__(self):
        return len(self.img_paths)

# UNet Encoder
UNet의 인코더 파트입니다.
기본적으로 timm라이브러리의 regnetx_002를 가져와서 사용하였습니다.
timm에서 제공하는 pretrained weight로 초기화 시켰습니다.
해당 pretrained weight는 imagenet 데이터셋으로 학습된것으로 보입니다.
https://github.com/huggingface/pytorch-image-models/blob/main/timm/models/regnet.py
https://github.com/facebookresearch/pycls/blob/main/MODEL_ZOO.md

regnet encoder에서 UNet 생성에 필요없는 fnal_conv와 head layer는 제거 합니다.

conv0 레이어를 추가하여, regnet 이전에 붙였습니다.
해당 레이어는 2개의 1x1 conv layer로 이루어져있습니다.
일반 이미지와는 달리 Wildfire 영상의 scale변화가 샘플마다 컸기 때문에, 1x1 conv layer로 우선 각 픽셀에서 어떠한 정규화가 일어나길 기대햇습니다.

regnet의 conv1레이어는 원래 3채널의 RGB 값을 받는 레이어이기 때문에, conv0의 output인 32 채널을 받을 수 있도록, 수정을 가하였습니다.

In [3]:
import timm
import torch
from torch import nn


class RegNetEncoder(nn.Module):
    def __init__(
        self,
        name: str,
        in_ch: int,
        empty_out_depths: list[int],
    ):

        super().__init__()

        if name == "regnetx_002":
            self.model = timm.create_model("regnetx_002", pretrained=True)
        elif name == "regnetx_004":
            self.model = timm.create_model("regnetx_004", pretrained=True)
        elif name == "regnetx_006":
            self.model = timm.create_model("regnetx_006", pretrained=True)
        elif name == "regnetx_008":
            self.model = timm.create_model("regnetx_008", pretrained=True)
        else:
            raise ValueError(name)

        # Remove original fc layer
        del self.model.final_conv
        del self.model.head

        # conv0
        self.conv0 = nn.Sequential(
            nn.Conv2d(in_ch * 2, 32, kernel_size=1, padding=0, bias=True),
            # nn.BatchNorm2d(32),
            nn.ReLU(True),
            nn.Conv2d(32, 32, kernel_size=1, padding=0, bias=True),
            # nn.BatchNorm2d(32),
            nn.ReLU(True),
        )

        # Patch first layer
        patch_ch = 32
        with torch.no_grad():
            orig_weight = self.model.stem.conv.weight.detach()

            new_conv = nn.Conv2d(patch_ch, 32, kernel_size=3, stride=1, padding=1, bias=False)
            new_conv.weight[:] = (
                orig_weight.repeat(1, (patch_ch + 2) // 3, 1, 1)[:, :patch_ch] * 3 / patch_ch
            )
            self.model.stem.conv = new_conv

        # ETC
        self.empty_out_depths = empty_out_depths

    def _get_stages(self) -> list[nn.Module]:
        return [
            # nn.Identity(),
            nn.Sequential(self.conv0, self.model.stem),
            self.model.s1,
            self.model.s2,
            self.model.s3,
            self.model.s4,
        ]

    @property
    def out_channels(self) -> list[int]:
        channels = [
            # self.model.stem.conv.in_channels,
            self.model.stem.conv.out_channels,
            self.model.s1.b1.conv1.conv.out_channels,
            self.model.s2.b1.conv1.conv.out_channels,
            self.model.s3.b1.conv1.conv.out_channels,
            self.model.s4.b1.conv1.conv.out_channels,
        ]
        return [0 if d in self.empty_out_depths else ch for d, ch in enumerate(channels)]

    def forward(self, x: torch.Tensor) -> list[torch.Tensor]:
        stages = self._get_stages()

        features = []
        for depth, stage in enumerate(stages):
            x = stage(x)

            if depth in self.empty_out_depths:
                B, _, H, W = x.shape
                empty_tensor = torch.zeros((B, 0, H, W), dtype=x.dtype, device=x.device)
                features.append(empty_tensor)
            else:
                features.append(x)

        return features

  from .autonotebook import tqdm as notebook_tqdm


# UNet Decoder

UNet Decoder 파트입니다.
업샘플링 layer와 conv layer로 이루어져있는 전형적인 디코더 구조입니다.
인코더의 같은 level에서 skip connection 또한 수신하는 구조입니다.

In [4]:
import torch
from torch import nn


class Conv2dReLU(nn.Sequential):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        padding=0,
        stride=1,
        use_batchnorm=True,
    ):

        conv = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size,
            stride=stride,
            padding=padding,
            bias=not (use_batchnorm),
        )
        relu = nn.ReLU(inplace=True)

        if use_batchnorm:
            bn = nn.BatchNorm2d(out_channels)
        else:
            bn = nn.Identity()

        super().__init__(conv, bn, relu)

class DecoderBlock(nn.Module):
    def __init__(
        self,
        in_ch: int,
        skip_ch: int,
        out_ch: int,
        upsample_mode: str,
        use_batchnorm=True,
    ):
        super().__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode=upsample_mode)

        self.conv1 = Conv2dReLU(
            in_ch + skip_ch,
            out_ch,
            kernel_size=3,
            padding=1,
            use_batchnorm=use_batchnorm,
        )
        self.conv2 = Conv2dReLU(
            out_ch,
            out_ch,
            kernel_size=3,
            padding=1,
            use_batchnorm=use_batchnorm,
        )

    def forward(self, x, skip=None):
        x = self.upsample(x)
        if skip is not None:
            x = torch.cat([x, skip], dim=1)
        x = self.conv1(x)
        x = self.conv2(x)
        return x


class CenterBlock(nn.Sequential):
    def __init__(self, in_ch: int, out_ch: int, use_batchnorm=True):
        conv1 = Conv2dReLU(
            in_ch,
            out_ch,
            kernel_size=3,
            padding=1,
            use_batchnorm=use_batchnorm,
        )
        conv2 = Conv2dReLU(
            out_ch,
            out_ch,
            kernel_size=3,
            padding=1,
            use_batchnorm=use_batchnorm,
        )
        super().__init__(conv1, conv2)


class UNetDecoder(nn.Module):
    def __init__(
        self,
        center_ch: int,
        skip_chs: list[int],
        decoder_chs: list[int],
        upsample_mode: str,
        use_batchnorm=True,
        use_center_block=False,
    ):
        super().__init__()

        if use_center_block:
            self.center = CenterBlock(center_ch, center_ch, use_batchnorm=use_batchnorm)
        else:
            self.center = nn.Identity()

        in_chs = [center_ch] + decoder_chs[:-1]
        blocks = [
            DecoderBlock(in_ch, skip_ch, out_ch, upsample_mode, use_batchnorm=use_batchnorm)
            for in_ch, skip_ch, out_ch in zip(in_chs, skip_chs, decoder_chs, strict=True)
        ]
        self.blocks = nn.ModuleList(blocks)

    def forward(self, center_feat, skip_feats):

        x = self.center(center_feat)
        xs = []
        for decoder_block, skip_feat in zip(self.blocks, skip_feats, strict=True):
            x = decoder_block(x, skip_feat)
            xs.append(x)

        return xs

# UNet
위에서 만든 인코더와 디코더를 합쳐 하나의 UNet을 만드는 코드입니다.

추가적으로 mmengine에서 사용하는 BaseModel도 정의되어 있습니다.
BaseModel 내부에 loss를 계산하는 부분을 확인할 수 있습니다.

In [5]:
import torch
import torch.nn.functional as F
from mmengine.model import BaseModel as MMBaseModel
from mmengine.registry import MODELS
from torch import nn

class Conv2dUpsample(nn.Sequential):
    def __init__(self, in_ch: int, out_ch: int, kernel_size: int, upsampling: int):
        conv2d = nn.Conv2d(in_ch, out_ch, kernel_size=kernel_size, padding=kernel_size // 2)
        upsampling = (
            nn.Upsample(scale_factor=upsampling, mode="bilinear")
            if upsampling > 1
            else nn.Identity()
        )
        super().__init__(conv2d, upsampling)

class UNet(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()

        self.encoder = MODELS.build(encoder)
        encoder_chs = self.encoder.out_channels

        decoder["center_ch"] = encoder_chs[-1]
        decoder["skip_chs"] = encoder_chs[::-1][1:]
        self.decoder = MODELS.build(decoder)

        # head
        decoder_chs = decoder["decoder_chs"]
        self.head0 = Conv2dUpsample(decoder_chs[-1], 1, kernel_size=3, upsampling=1)

    def forward(self, inputs):
        x = inputs["img"]  # (B, C, H, W)

        # Forward Pass
        encoded = self.encoder(x)
        decoder_out = self.decoder(encoded[-1], encoded[::-1][1:])

        # output_h4 = self.head4(decoder_out[-5])
        # output_h3 = self.head3(decoder_out[-4])
        # output_h2 = self.head2(decoder_out[-3])
        # output_h1 = self.head1(decoder_out[-2])
        output_h0 = self.head0(decoder_out[-1])

        # output_h0 = torch.sigmoid(output_h0)

        return output_h0


class OurBaseModel(MMBaseModel):
    def __init__(self, unet):
        super().__init__()

        self.unet = MODELS.build(unet)

    @property
    def device(self):
        return next(self.parameters()).device

    def forward(self, mode, **inputs):
        y_pred = self.unet(inputs)  # (B, 1, H, W)
        y_gt = inputs["mask"]  # (B, 1, H, W)

        if mode == "loss":
            lovasz_loss = lovasz_hinge(y_pred, y_gt, use_elu=True)
            return {"lovasz_loss": lovasz_loss}
        elif mode == "predict":
            return torch.sigmoid(y_pred), y_gt

# Run Prediction
테스트셋 추론 관련된 config를 세팅하고 추론을 수행하는 코드입니다.
1분 넘게 걸립니다.

TTA를 수행합니다.
원본, H_Flip, V_FLIP, HV_Flip 이미지들에 대해서 각각 추론을 하고 평균을 합니다.
Sigmoid output이기 때문엔 segmentation threshold는 0.5로 지정하였습니다.

수행이 끝나면 노트북이 있는 폴더에 "y_pred.pkl"파일을 저장합니다.

In [6]:
import joblib
import numpy as np
import torch
from mmengine.registry import MODELS
from mmengine.runner.checkpoint import load_checkpoint

@torch.no_grad()
def main(ckpt_path: str):

    input_chs = [0, 1, 2, 3, 4, 5, 6]

    model = dict(
        type=OurBaseModel,
        unet=dict(
            type=UNet,
            encoder=dict(
                type=RegNetEncoder, name="regnetx_002", in_ch=len(input_chs), empty_out_depths=[]
            ),
            decoder=dict(
                type=UNetDecoder,
                decoder_chs=[128, 64, 48, 32],  # , 24],
                upsample_mode="nearest",
                use_batchnorm=True,
            ),
        ),
    )

    device = torch.device("cuda")
    model = MODELS.build(model).to(device)
    load_checkpoint(
        model, str(ckpt_path), map_location="cpu", strict=True, revise_keys=[(r"module\.", "")]
    )

    dataset = TestDataset(input_chs=[0, 1, 2, 3, 4, 5, 6])

    model.eval()

    preds = []
    img_paths = []
    for idx, x in enumerate(dataset):
        img_x = torch.tensor(x["img"]).to(device)
        img_x = torch.stack([img_x, img_x.flip(1), img_x.flip(2), img_x.flip([1, 2])])

        y_pred, _ = model.forward(mode="predict", img=img_x, mask=None)

        y_pred[1] = y_pred[1].flip(1)
        y_pred[2] = y_pred[2].flip(2)
        y_pred[3] = y_pred[3].flip([1, 2])
        y_pred = y_pred.mean(0)

        preds.append(y_pred.squeeze().cpu().numpy())
        img_paths.append(x["img_path"])

    preds = np.stack(preds)
    preds = (preds > 0.5).astype(np.uint8)

    y_pred_dict = {}
    for img_path, pred in zip(img_paths, preds, strict=True):
        y_pred_dict[img_path.name] = pred


    joblib.dump(y_pred_dict, "./y_pred.pkl")

# 체크포인트 파일 경로 설정 필요
현재는 leaderboard 체크포인트로 설정되어있습니다.

In [7]:
main("leaderboard_epoch_130.pth")
# main("Logs/xxxxxx/epoch_xxx.pth")

Loads checkpoint by local backend from path: leaderboard_epoch_130.pth
