In [1]:
import os
from PIL import Image
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import rasterio
from torchvision import transforms
import torchvision.models as models
from torchvision.io import read_image
from torchvision.transforms import ToTensor, Normalize
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data.dataset import Subset
from sklearn.metrics import classification_report
import os
import matplotlib.pyplot as plt
import random
import pdb
from collections import Counter
import segmentation_models_pytorch as smp
import albumentations as albu
from skimage.io import imread
import pandas as pd
import cv2
tqdm.pandas()

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
class_map = {0:'GSD_50cm',
 1:'GSD_65cm',
 2:'GSD_80cm',
 3:'GSD_100cm',
 4:'GSD_124cm',
 5:'GSD_150cm',
 6:'GSD_175cm',
 7:'GSD_200cm',
 8:'GSD_250cm',
 9:'GSD_300cm'}

In [7]:
def get_model(gsd, aug=False):
    if aug:
        exp_dic = {
            '50cm': 'exp2',
            '65cm': 'exp4', 
            '80cm': 'exp6'
        }
    else:
        exp_dic = {
            '50cm': 'exp1',
            '65cm': 'exp3', 
            '80cm': 'exp5'
        }
        
    model = torch.load(f'/work/scorreacardo_umass_edu/DeepSatGSD/models/segmentation_{exp_dic[gsd]}_{gsd}_best_model.pth')
    return model

def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')

def get_preprocessing(preprocessing_fn):
    """Construct preprocessing transform
    
    Args:
        preprocessing_fn (callbale): data normalization function 
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    
    """
    
    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)

def sigmoid(X):
    return 1/(1+np.exp(-X))

def iou(pr_mask, gt_mask):
    pr_mask_area = np.count_nonzero(pr_mask)
    gt_mask_area = np.count_nonzero(gt_mask)
    intersection = np.count_nonzero(np.logical_and(pr_mask, gt_mask))
    if pr_mask_area + gt_mask_area - intersection == 0:
        return None
    iou = intersection/(pr_mask_area + gt_mask_area - intersection)
    return iou

def se(pr_mask, gt_mask):
    pr_contours, _ = cv2.findContours(pr_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    gt_contours, _ = cv2.findContours(gt_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    sq_error = (len(pr_contours) - len(gt_contours))**2
    return sq_error

def perc_error(pr_mask, gt_mask):
    pr_contours, _ = cv2.findContours(pr_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    gt_contours, _ = cv2.findContours(gt_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if len(gt_contours) == 0:
        return None
    perc_error = (len(pr_contours) - len(gt_contours))/len(gt_contours)
    return perc_error*100

def perc_area(pr_mask, gt_mask):
    #area as percentage of total area
    pr_pixels = cv2.countNonZero(pr_mask)
    gt_pixels = cv2.countNonZero(gt_mask)
    if gt_pixels == 0:
        return None
    return (pr_pixels/gt_pixels)*100

In [53]:
gsd = 2

In [54]:
res_counter = []
res_dic = {
    'img_name': [],
    'class_gsd':[],
    'iou': [],
    'se': [],
    'perc_error': [],
    'perc_area': []
}

In [55]:
# Path to the folder containing the images for inference
inference_folder = f'/work/scorreacardo_umass_edu/DeepSatGSD/data/processed/inferece_gep_classified/{class_map[gsd]}/valid/imgs'
inference_folder_mask = f'/work/scorreacardo_umass_edu/DeepSatGSD/data/processed/inferece_gep_classified/{class_map[gsd]}/valid/masks'

In [56]:
class_map[gsd]

'GSD_80cm'

In [57]:
# Iterate over the images in the inference folder
for image_name in tqdm(os.listdir(inference_folder)):
    image_path = os.path.join(inference_folder, image_name)
    mask_path = os.path.join(inference_folder_mask, image_name)

    res_dic['img_name'].append(image_name)
    res_dic['class_gsd'].append(class_map[gsd])
    
    preprocessing_fn = smp.encoders.get_preprocessing_fn("resnet34", "imagenet")
    seg_model = get_model(class_map[gsd].split("_")[1], aug=True)
    seg_model.eval()
    seg_transform =  albu.Compose([
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ])

    image = imread(image_path)[:,:,:3].astype('float32')/255
    mask = imread(mask_path, as_gray=True)[:,:,np.newaxis]
    
    image_transformed = seg_transform(image=image, mask=mask)['image']
    mask_transformed = seg_transform(image=image, mask=mask)['mask']
    
    gt_mask_test = mask_transformed.squeeze()
    x_tensor_test = torch.from_numpy(image_transformed).to('cuda').unsqueeze(0)
      # Perform inference
    with torch.no_grad():
        pr_mask_test = seg_model.predict(x_tensor_test)
    pr_mask_test = (pr_mask_test.squeeze().cpu().numpy().round())

    pr_mask_test_th = sigmoid(pr_mask_test)
    pr_mask_test_th = pr_mask_test_th > 0.5
    pr_mask_test_th = np.uint8(np.multiply(pr_mask_test_th, 255))

    gt_mask_test = np.uint8(gt_mask_test)
    #calculating IoU per patch:
    iou_score = iou(pr_mask_test_th, gt_mask_test)
    se_score = se(pr_mask_test_th, gt_mask_test)
    pe_score = perc_error(pr_mask_test_th, gt_mask_test)
    pa_score = perc_area(pr_mask_test_th, gt_mask_test)
    res_dic['iou'].append(iou_score)
    res_dic['se'].append(se_score)
    res_dic['perc_error'].append(pe_score)
    res_dic['perc_area'].append(pa_score)

100%|██████████| 2546/2546 [05:09<00:00,  8.23it/s]
