## Detection code of gravels from large ortho images using a trained model
# (update 2024 09 20)


In [None]:
# Display full window

%%HTML
<style>
    div#notebook-container    { width: 95%; }
    div#menubar-container     { width: 65%; }
    div#maintoolbar-container { width: 99%; }
</style>

In [1]:
# Import Detectron2 and related modules
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

# import some common libraries
import numpy as np
import os, math
import cv2
from PIL import Image
Image.MAX_IMAGE_PIXELS = None

import random
import matplotlib.pyplot as plt
%matplotlib inline

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog
from detectron2.data.catalog import DatasetCatalog

In [2]:
# Load detection model
MODEL_PATH = "directory path of your detection model"
model_path = os.path.join(MODEL_PATH, "model name of your detection model (*.pth)")

# Set up predictor
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")) # load backbone
cfg.MODEL.WEIGHTS = model_path # load weight of your model
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.7
cfg.INPUT.MAX_SIZE_TEST = 0
cfg.INPUT.MIN_SIZE_TEST = 0
cfg.TEST.DETECTIONS_PER_IMAGE = 500

predictor = DefaultPredictor(cfg)

In [3]:
# Load image data
TARGET_PATH = "Directory path of your target ortho images"

# set up image catalog
from detectron2.data import MetadataCatalog
MetadataCatalog.get("gravel_meta").thing_classes = ["Gravel"]
detector_metadata = MetadataCatalog.get("gravel_meta")

In [4]:
# This code outputs the detection results as original format of JSON file.
# Here, define the json format (basement) and functions

class cocojson_initialize:
    def __init__(self):
        #initialize json data
        licenses = {
                    "name":"",
                    "id":0,
                    "url":""
                    }
        info     = {
                    "contributor":"",
                    "date_created":"",
                    "description":"",
                    "url":"",
                    "version":"",
                    "year":"",
                    } 
        categories  = []
        n_ct = 0
        for class_name in detector_metadata.thing_classes:
            n_ct += 1
            add_cat = {
                       'id': n_ct,
                       'name':class_name,
                       'supercategory':'none',
                       }
            categories.append(add_cat)  
        images      = []
        annotations = []
        
        self.jData  = {
                       "licenses":[licenses],
                       "info":info,
                       "categories":categories, 
                       "images":images,
                       "annotations":annotations  
                       }
def PolyArea(seg):
    if len(seg)>=2:
        x = seg[0::2]
        y = seg[1::2]
        return 0.5*np.abs(np.dot(x,np.roll(y,1))-np.dot(y,np.roll(x,1)))
    else:
        return ''

def Mask2Seg(mask, shift):
    mask = mask.astype(np.uint8)

    contours, hierarchy = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)#cv2.CHAIN_APPROX_NONE, CHAIN_APPROX_SIMPLE, cv2.CHAIN_APPROX_TC89_L1
    if contours:
        boundaries = np.reshape(contours[0],(1,-1))[0]
        #calc position in the global image
        x = boundaries[0::2]
        y = boundaries[1::2]
        boundaries[0::2] = [n + shift[0] for n in x]
        boundaries[1::2] = [n + shift[1] for n in y]
        
        boundaries = boundaries.tolist()
        #boundaries = str(boundaries[0])
        #boundaries.replace( '\n' , '' )
    else:
        boundaries = []
    return boundaries


In [None]:
# Detect boulders from Ortho images

# Load modules
from decimal import Decimal, ROUND_HALF_UP
from detectron2.data.datasets import register_coco_instances
import xml.etree.ElementTree as ET
from detectron2.utils.visualizer import ColorMode
import glob, json
from tqdm.notebook import tqdm
from tifffile import TiffFile

# Make up output directory
out_dir_name = 'outputs'
os.makedirs(os.path.join(TARGET_PATH,out_dir_name), exist_ok=True)

# Set up detection settings
pix_step_rate = [0.25, 0.25]
pix_window    = [2000, 2000]
apply_sharp   = False

# Calc scan settinsg
pix_step = np.round(np.multiply(pix_step_rate, pix_window))
pix_step = np.asarray(pix_step, dtype = int)
pix_step = pix_step.tolist()
print('Scan size:' + str(pix_window))
print('Scan steps:' + str(pix_step))

# Get target image list 
types = ['*.tif','*.jpg','*.JPG']
tag_list = []
for im_type in types:
    tag_list = tag_list + glob.glob(os.path.join(TARGET_PATH,im_type))

num_image = len(tag_list)
print("Count of images:" + str(num_image))

# Appluy detections to each image
n_im = 0
print('Total progress:')
for d in tqdm(tag_list):
    # Initialize
    n_im += 1
    imname = os.path.splitext(os.path.basename(d))
    SAVE_DIR = os.path.join(TARGET_PATH, out_dir_name)
    os.makedirs(SAVE_DIR, exist_ok=True)
    jData = cocojson_initialize()
    
    # Show progress
    print('['+str(n_im)+ '/' + str(len(tag_list))+']:'+imname[0])
        
    # Load ortho image as a memory map
    print("Loading image as memory map...",end="")
    with TiffFile(d) as tif:
        mmap = tif.asarray(out="memmap") 
    print("Done")

    # Calc split size
    pix_im  = mmap.shape
    num_col = math.ceil((pix_im[1]-pix_window[1])/pix_step[1])+1
    num_row = math.ceil((pix_im[0]-pix_window[0])/pix_step[0])+1
    
    # Get split locations
    imageGridData = []
    for c in tqdm(range(num_col), leave=False):
        for r in tqdm(range(num_row), leave=False):
            row_range   = [pix_step[0]*r, pix_step[0]*r+pix_window[0]]
            col_range   = [pix_step[1]*c, pix_step[1]*c+pix_window[1]]
            imageGridData.append([row_range[0], row_range[1],col_range[0], col_range[1]])

    # Get output image info        
    im_data  = {
                "id":1,
                "width":pix_im [1],
                "height":pix_im[0],
                "file_name":imname[0]+imname[1],
                "license":0,
                "flickr_url":"",
                "coco_url":"",
                "date_capture":"",
                "window_size":pix_window,
                "window_step_rate":pix_step_rate,
                "image_grid":imageGridData
                }
    
    # Add image info into the output jData
    jData.jData["images"].append(im_data)
    
    # Apply detections
    an_id = 0
    for c in tqdm(range(num_col), leave=False):
        for r in tqdm(range(num_row), leave=False):
            # Split ortho image
            row_range   = [pix_step[0]*r, pix_step[0]*r+pix_window[0]]
            col_range   = [pix_step[1]*c, pix_step[1]*c+pix_window[1]]
            im_temp     = np.array(mmap[row_range[0]:row_range[1],col_range[0]:col_range[1],:])
            
            # Apply sharpness to the split image (optional)
            if apply_sharp:
                kernel = np.array([[0, -1, 0],[-1, 5,-1], [0, -1, 0]])
                im_temp = cv2.filter2D(im_temp, -1, kernel)
            
            # Remove alpha layer from tiff
            if im_temp.shape[2]==4:
                im_temp = im_temp[:,:, 0:3]#if tif

            # Convert RGB to BGR
            im_temp = im_temp[:, :, ::-1]
            pix_im_temp = im_temp.shape

            # Detect
            outputs = predictor(im_temp)

            # Extract data from the detected raw result
            bboxes     = outputs["instances"].pred_boxes.tensor.to("cpu").numpy()
            scores     = outputs["instances"].scores.to("cpu").numpy()
            classes    = outputs["instances"].pred_classes
            num_ins    = len(bboxes)
            mask_array = outputs["instances"].pred_masks.to("cpu").numpy()
            
            # Get label and counter of each instance
            for idx in range(num_ins):
                an_id += 1
                # Get label name
                label_name = detector_metadata.thing_classes[classes[idx].item()]
                                                    
                # Get boundarybox
                bbox   = [bboxes[idx][0] + col_range[0], bboxes[idx][1] + row_range[0], bboxes[idx][2] - bboxes[idx][0] + 1, bboxes[idx][3] - bboxes[idx][1] + 1]
                
                # Convert mask from BW image to mask lines(xy)
                boundary = Mask2Seg(mask_array [idx], [col_range[0], row_range[0]])
                
                # Mmake annotations info
                an_data    = {"id":an_id,
                              "image_id":1,
                              "category_id":classes[idx].item()+1,
                              "segmentation":[boundary],
                              "area":PolyArea(boundary),
                              "bbox":bbox,
                              "iscrowd":0,
                              "attributes":{"occluded":0},
                              "score":str(scores[idx]),
                              }

                # Add annotation info into output jData
                jData.jData["annotations"].append(an_data)
            
            # Save annoated split image 
            save_annotated_im = False
            if save_annotated_im:
                v = Visualizer(im_temp,
                               metadata=detector_metadata, 
                               scale=1.0
                              )
                out = v.draw_instance_predictions(outputs["instances"].to("cpu"))
                im_labeled = cv2.resize(out.get_image(), dsize=(pix_im_temp[1], pix_im_temp[0]))
                cv2.imwrite(os.path.join(SAVE_DIR, new_imname + '_labeled' + '.jpg'),im_labeled)
                                                        
    # Export jData as json
    with open (os.path.join(SAVE_DIR,imname[0]+'_detect_object.json'), 'w') as fp:
        json.dump(jData.jData, fp)