In [None]:
%%capture

# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
!pip install segmentation_models_pytorch
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os, cv2, random
import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import albumentations as A
import segmentation_models_pytorch as smp

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
ROOT = "/kaggle/input/terra-seg-rugged-terrain-segmentation/offroad-seg-kaggle"
TRAIN_IMG = f"{ROOT}/train_images"
TRAIN_MSK = f"{ROOT}/train_masks"
TEST_IMG  = f"{ROOT}/test_images_padded"

In [None]:
class TerraSegDataset(Dataset):
    def __init__(self, img_dir, mask_dir=None, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = sorted(os.listdir(img_dir))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = cv2.imread(os.path.join(self.img_dir, self.images[idx]))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.mask_dir:
            mask = cv2.imread(os.path.join(self.mask_dir, self.images[idx]), 0)
            mask = (mask > 0).astype("float32")
        else:
            mask = None

        if self.transform:
            augmented = self.transform(image=img, mask=mask)
            img = augmented["image"]
            mask = augmented["mask"] if mask is not None else None

        img = img.transpose(2, 0, 1) / 255.0
        img = torch.tensor(img, dtype=torch.float32)

        if mask is not None:
            mask = torch.tensor(mask, dtype=torch.float32).unsqueeze(0)
            return img, mask

        return img, self.images[idx]

In [None]:
train_tfms = A.Compose([
    A.RandomResizedCrop(size=(512, 512), scale=(0.7, 1.0)),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=15, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
])

valid_tfms = A.Compose([
    A.Resize(512, 512)
])

In [None]:
train_ds = TerraSegDataset(TRAIN_IMG, TRAIN_MSK, train_tfms)
train_loader = DataLoader(
    train_ds,
    batch_size=4,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

In [None]:
model = smp.DeepLabV3Plus(
    encoder_name="mit_b5",
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,
    activation=None
).to(DEVICE)

In [None]:
bce = nn.BCEWithLogitsLoss()
dice = smp.losses.DiceLoss(mode="binary")

def criterion(pred, target):
    return 0.5 * bce(pred, target) + 0.5 * dice(pred, target)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

In [None]:
EPOCHS = 20
best_loss = float("inf")

for epoch in range(EPOCHS):
    model.train()
    epoch_loss = 0.0

    for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}"):
        imgs = imgs.to(DEVICE)
        masks = masks.to(DEVICE)

        optimizer.zero_grad()
        preds = model(imgs)
        loss = criterion(preds, masks)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    print(f"Epoch {epoch+1} | Loss: {avg_loss:.4f}")

    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), "best_terraseg_model.pth")
        print(f"New best model saved! Loss: {best_loss:.4f}")


In [None]:
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return " ".join(map(str, runs))

In [None]:
test_ds = TerraSegDataset(TEST_IMG, transform=valid_tfms)
test_loader = DataLoader(test_ds, batch_size=1, shuffle=False)

model.eval()
rows = []

with torch.no_grad():
    for img, img_id in tqdm(test_loader):
        img = img.to(DEVICE)
        pred = torch.sigmoid(model(img))[0, 0].cpu().numpy()
        mask = (pred > 0.5).astype(np.uint8)
        rows.append([img_id[0], rle_encode(mask)])

In [None]:
test_ds = TerraSegDataset(TEST_IMG, transform=valid_tfms)
test_loader = DataLoader(test_ds, batch_size=1, shuffle=False)

model.eval()
rows = []

with torch.no_grad():
    for img, img_id in tqdm(test_loader):
        img = img.to(DEVICE)
        pred = torch.sigmoid(model(img))[0, 0].cpu().numpy()
        mask = (pred > 0.5).astype(np.uint8)
        rows.append([img_id[0], rle_encode(mask)])

In [None]:
sub = pd.DataFrame(rows, columns=["image_id", "encoded_pixels"])
sub.to_csv("submission.csv", index=False)

print("submission.csv generated successfully")