In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%%time
import zipfile
for i in [0, 1, 2, 3]:
    with zipfile.ZipFile(f'drive/MyDrive/OffroadSegmentation/data/train_images_A_{i}.zip') as existing_zip:
        existing_zip.extractall('train_images')

with zipfile.ZipFile(f'drive/MyDrive/OffroadSegmentation/data/train_annotations_A.zip') as existing_zip:
    existing_zip.extractall('train_annotations')

!cp -r drive/MyDrive/OffroadSegmentation/data/precision_test_images precision_test_images

!pip install git+https://github.com/rwightman/pytorch-image-models.git 
!pip install -U git+https://github.com/albu/albumentations --no-cache-dir
!pip install pytorch-lightning
!pip install segmentation-models-pytorch 

In [None]:
import os
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
import glob

from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset

import torch
import numpy as np

import albumentations as albu
from tqdm import tqdm_notebook as tqdm


import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.metrics.functional import accuracy
from sklearn.model_selection import StratifiedKFold

In [None]:
df = pd.read_csv('/content//drive/MyDrive/Signate-OffroadSegmentation/data/5fold_validation.csv')
df['file_name'] = df['png_name']

fold = 0

t_df = df[df['type'] == 'train_images']

t_df['has_road'] = 0
t_df.loc[t_df['road'] > t_df['dirt road'] , 'has_road'] = 1

train_df = t_df[t_df['fold'] != fold]
valid_df = t_df[t_df['fold'] == fold]

In [None]:
t_df

In [None]:
class Dataset(BaseDataset):
    """CamVid Dataset. Read images, apply augmentation and preprocessing transformations.
    
    Args:
        images_dir (str): path to images folder
        masks_dir (str): path to segmentation masks folder
        class_values (list): values of classes to extract from segmentation mask
        augmentation (albumentations.Compose): data transfromation pipeline 
            (e.g. flip, scale, etc.)
        preprocessing (albumentations.Compose): data preprocessing 
            (e.g. noralization, shape manipulation, etc.)
    
    """
    
    
    def __init__(
            self, 
            df, 
            augmentation=None, 
            preprocessing=None,
    ):
        self.images_fps = df['png_name'].values
        self.label = df['has_road'].values
                
        self.augmentation = augmentation
        self.preprocessing = preprocessing
    
    def __getitem__(self, i):
        
        # read data
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        label = self.label[i]

        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image)
            image = sample['image']
        
        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image)
            image = sample['image']
            
        return image, label
        
    def __len__(self):
        return len(self.images_fps)

In [None]:
def get_training_augmentation():
    IMAGE_SIZE = [1080, 1920]
    train_transform = [
        albu.Resize(*[1056, 1920]),
        albu.PadIfNeeded(1056, 1920),

    #     albu.HorizontalFlip(p=0.5),

    #     albu.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),

        # albu.PadIfNeeded(min_height=1080, min_width=1920, always_apply=True, border_mode=0),
        # albu.RandomCrop(height=960, width=960, always_apply=True),
        # albu.RandomCrop(height=IMAGE_SIZE[0], width=IMAGE_SIZE[0], always_apply=True),

        # albu.IAAAdditiveGaussianNoise(p=0.2),
        # albu.IAAPerspective(p=0.5),

        # albu.OneOf(
        #     [
        #         albu.CLAHE(p=1),
        #         albu.RandomBrightness(p=1),
        #         albu.RandomGamma(p=1),
        #     ],
        #     p=0.9,
        # ),

        # albu.OneOf(
        #     [
        #         albu.IAASharpen(p=1),
        #         albu.Blur(blur_limit=3, p=1),
        #         albu.MotionBlur(blur_limit=3, p=1),
        #     ],
        #     p=0.9,
        # ),

        # albu.OneOf(
        #     [
        #         albu.RandomContrast(p=1),
        #         albu.HueSaturationValue(p=1),
        #     ],
        #     p=0.9,
        # ),
    ]
    return albu.Compose(train_transform)


def get_validation_augmentation():
    """Add paddings to make image shape divisible by 32"""
    IMAGE_SIZE = [1080, 1920]
    test_transform = [
        albu.Resize(*[1056, 1920]),
        albu.PadIfNeeded(1056, 1920),
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing():
    """Construct preprocessing transform
    
    Args:
        preprocessing_fn (callbale): data normalization function 
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    
    """
    
    _transform = [
        # albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor),
    ]
    return albu.Compose(_transform)

In [None]:
from torch import nn
import torch.nn.functional as F
import timm

class LightSystem(pl.LightningModule):

    def __init__(self, model_name='efficientnet_b0', pretrained=True):
        super(LightSystem, self).__init__()
        self.model = timm.create_model(
            model_name, pretrained, num_classes=1000,
        )

        in_features = self.model.classifier.in_features
        self.model.classifier = nn.Linear(in_features, 1)



    def forward(self, x):
        logit = self.model(x)
        logit = torch.sigmoid(logit)

        return logit

    def training_step(self, batch, batch_idx):
        # REQUIRED
        x, y = batch
        y_hat = self.forward(x)
        loss = F.binary_cross_entropy(torch.sigmoid(y_hat).flatten(), y.float())
        # loss = F.mse_loss(y_hat.flatten(), y.float())
        return {'loss': loss}
 
    def validation_step(self, batch, batch_idx):
        # OPTIONAL
        x, y = batch
        out = self.forward(x)
        loss = F.binary_cross_entropy(torch.sigmoid(out).flatten(), y.float())
        # loss = F.mse_loss(out.flatten(), y.float())
        return {'val_loss': loss}
 
    def validation_end(self, outputs):
        # OPTIONAL
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        return {'avg_val_loss': avg_loss}
 
    def configure_optimizers(self):
        # REQUIRED
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0001)
        return optimizer
 



In [None]:
ls = LightSystem()
train_dataset = Dataset(train_df,
                        augmentation=get_training_augmentation(), 
                        preprocessing=get_preprocessing(),
                        )
                        
valid_dataset = Dataset(valid_df,
                        augmentation=get_validation_augmentation(), 
                        preprocessing=get_preprocessing(),
                        )

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=12)
valid_loader = DataLoader(valid_dataset, batch_size=4, shuffle=False, num_workers=4)                

trainer = pl.Trainer(max_epochs=50, gpus=1)
trainer.fit(ls, train_loader, valid_loader)

In [None]:
trainer.save_checkpoint('/content//drive/MyDrive/OffroadSegmentation/pl_checkpoint/offroad_efficient_net_b0_has_road_or_not_2.ckpt')

In [None]:
# best_checkpoints = trainer.checkpoint_callback.best_model_path
pretrained_model = LightSystem().load_from_checkpoint(checkpoint_path = '/content//drive/MyDrive/OffroadSegmentation/pl_checkpoint/offroad_efficient_net_b0_has_road_or_not_2.ckpt')
pretrained_model = pretrained_model.to("cuda")
pretrained_model.eval()
pretrained_model.freeze()

In [None]:
fin_out = []
for data in tqdm(valid_loader):
    y_hat = pretrained_model(data[0].to("cuda"))
    # y_hat = torch.argmax(y_hat,dim=1)
    fin_out.extend(y_hat.cpu().detach().numpy().tolist())

In [None]:
df_val_result = valid_df.copy()
df_val_result['pred_has_road'] = np.array(fin_out).reshape(len(fin_out))
df_val_result['pred_has_road'] = df_val_result['pred_has_road'].round(3)

In [None]:
df_val_result[df_val_result['has_road'] == 1].head()

In [None]:
# def visualize(**images):
#     """PLot images in one row."""
#     n = len(images)
#     plt.figure(figsize=(16, 5))
#     for i, (name, image) in enumerate(images.items()):
#         plt.subplot(1, n, i + 1)
#         plt.xticks([])
#         plt.yticks([])
#         plt.title(' '.join(name.split('_')).title())
#         plt.imshow(image)
#     plt.show()

# val_fold = 4
# df = df_val_result[(df_val_result['pred_has_road'] < 0.9) & (df_val_result['has_road'] == 1)]

# for i, row in df.iloc[0:5].iterrows():
#     image = cv2.imread(row['png_name'])
#     image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
#     mask = cv2.imread(row['annotation'])
#     mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)

#     visualize(image=image, mask=mask)

In [None]:
import segmentation_models_pytorch as smp

class TestDataset(BaseDataset):
    
    def __init__(
            self, 
            df, 
            augmentation=None, 
            preprocessing=None,
    ):
        self.images_fps = df['file_name'].values
                
        self.augmentation = augmentation
        self.preprocessing = preprocessing
    
    def __getitem__(self, i):
        
        # read data
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image)
            image = sample['image']
        
        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image)
            image = sample['image']
            
        return image
        
    def __len__(self):
        return len(self.images_fps)


def get_preprocessing(preprocessing_fn):
    """Construct preprocessing transform
    
    Args:
        preprocessing_fn (callbale): data normalization function 
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    
    """


    if preprocessing_fn:
        _transform = [
            albu.Lambda(image=preprocessing_fn),
            albu.Lambda(image=to_tensor),
        ]
    else:
        _transform = [
            albu.Lambda(image=to_tensor),
        ]

    return albu.Compose(_transform)

ENCODER = 'resnet34'
ENCODER_WEIGHTS = 'imagenet'
preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

In [None]:
# test data を 2値分類

pretrained_model = LightSystem().load_from_checkpoint('/content//drive/MyDrive/OffroadSegmentation/pl_checkpoint/offroad_efficient_net_b0_has_road_or_not_2.ckpt')
pretrained_model = pretrained_model.to("cuda")
pretrained_model.eval()
pretrained_model.freeze()

png_l = glob.glob('precision_test_images/*.png')

df_test_path = pd.DataFrame()
df_test_path['file_name'] = np.sort(png_l)
df_test_path['png_name'] = df_test_path['file_name'].str.split('/', expand=True)[1]

test_dataset = TestDataset(
    df_test_path, 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(None),
)

test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=4)

fin_out = []
for data in tqdm(test_loader):
    y_hat = pretrained_model(data.to("cuda"))
    # y_hat = torch.argmax(y_hat,dim=1)
    fin_out.extend(y_hat.cpu().detach().numpy().tolist())

df_test_path['pred_has_road'] = np.array(fin_out).reshape(len(fin_out))
df_test_path['f_has_road'] = df_test_path['pred_has_road'] > 0.9

df_test_path.to_csv('/content//drive/MyDrive/Signate-OffroadSegmentation/data/test_2stage_binary.csv')