# Semantic Segmentation - Automatic segmentation model testing

## Setup - Colab

In [None]:
from google.colab import drive
from google.colab import files as colab_files
drive.mount("/content/gdrive")

## Setup - Python

In [None]:
import cv2
import glob
import os
import numpy as np
import random
import math
import time
import shutil
from matplotlib.pyplot import imshow
from keras.preprocessing.image import ImageDataGenerator
import threading

%tensorflow_version 2.x
%matplotlib inline

In [None]:
SIZE = 256
STRIDE = 64
REMOVE_NOTHING_CHANCE = 75 # chance of removing a clear image
# segnet | vgg_segnet | resnet50_segnet
CURRENT_MODEL = "segnet.resnet50_segnet"

In [None]:
WORK_DIR = "/PATH/TO/DATA"

## Setup - Dependencies

In [None]:
!git clone https://github.com/divamgupta/image-segmentation-keras
%cd image-segmentation-keras
!python setup.py install

In [None]:
import keras_segmentation
from keras_segmentation.models.model_utils import transfer_weights
from keras_segmentation.pretrained import model_from_checkpoint_path

## Setup - Functions

### Dataset related

In [None]:
# creates a new dataset folder and a dataset-gen.zip file
def develop_dataset():
    os.system("rm -rf dataset")
    os.system("unzip \"%s/dataset.zip\"" % (WORK_DIR))
    os.system("mkdir \
            dataset/train-x dataset/train-y \
            dataset/train-processed \
            out \
            tmp")
    convert_png()
    cut_images(STRIDE) # clear_dataset() is already called in here
    verify_data()
    os.system("zip -0 -r dataset-gen.zip dataset/ && \
                mv dataset-gen.zip \"%s/\"" % (WORK_DIR))

# unzips the generated dataset folder from the dataset-gen zip file
def create_dataset():
    os.system("unzip \"%s/dataset-gen.zip\"" % (WORK_DIR))
    verify_data()

# resets the dataset folder
def reset_dataset():
    os.system("rm -rf dataset")
    create_dataset()

def verify_data():
    train_x = 0 # train input files quantity
    train_p = 0 # train processed files quantity
    for file in glob.iglob("dataset/train-x/*.png"):
        train_x += 1
    for file in glob.iglob("dataset/train-processed/*.png"):
        train_p += 1

    print("[Verify data] train_x", train_x, ", train_p", train_p)
    if train_x != train_p:
        print("[ERROR] Dataset input and output quantity isn't synced")

# move the output folder
def move_out(folder):
    os.rename("out", "%s/%s" % (WORK_DIR, folder))

# converts all the dataset images from .tif to .png
def convert_png():
    for file in glob.iglob("dataset/*/**.tif"):
        img = cv2.imread(file, 0)
        cv2.imwrite(file.replace(".tif", ".png"), img)

# clears the dataset (removes some images)
def clear_dataset():
    for file in glob.iglob("dataset/train-y/*.png"):
        img = cv2.imread(file, 0)
        if np.sum(img) == 0:
            if random.randint(0,100) <= REMOVE_NOTHING_CHANCE:
                filename = file.split("/")[-1]
                file_x = "dataset/train-x/" + filename
                file_y = file

                os.system("rm \"%s\" \"%s\"" % (file_x, file_y))

# resets the out folder
def reset_out():
    if os.path.exists("out"):
        shutil.rmtree("out")
    if os.path.exists("tmp"):
        shutil.rmtree("tmp")
    os.mkdir("tmp")
    os.mkdir("out")
    os.mkdir("out/src")
    os.mkdir("out/split")
    os.mkdir("out/over")

In [None]:
# extends image by copying last pixel of each row/column
def extend_image(file):
    img = cv2.imread(file, 0)
    height = len(img)
    width = len(img[0])

    new_height = math.ceil(height/SIZE) * SIZE
    new_width = math.ceil(width/SIZE) * SIZE

    img_out = np.zeros((new_height,new_width,1))
    
    # copies the image
    for y in range(height):
        for x in range(width):
            img_out[y,x] = img[y,x]
    
    # expands the right area
    for y in range(0, height):
        for x in range(width, new_width):
            img_out[y,x] = img[y,width-1]
    
    # expands the bottom area
    for y in range(height, new_height):
        for x in range(0, width):
            img_out[y,x] = img[height-1,x]
    
    # expands the bottom-right area
    for y in range(height, new_height):
        for x in range(width, new_width):
            img_out[y,x] = img[height-1,width-1]

    cv2.imwrite(file, img_out)

In [None]:
# doesn't download, just copies
def download(folder, download_as=None):
    if download_as is None:
        download_as = folder
    if "/" in download_as:
        download_as = download_as.split("/")[-1]

    model_path_name = CURRENT_MODEL.replace(".","_")
    if not os.path.exists("%s/resultados_%s" % (WORK_DIR, model_path_name)):
        os.mkdir("%s/resultados_%s" % (WORK_DIR, model_path_name))
        
    dest = "%s/resultados_%s/%s" % (WORK_DIR, model_path_name, download_as)
    if os.path.exists(download_as):
        shutil.rmtree(download_as)
    if os.path.exists(dest):
        shutil.rmtree(dest)
    
    shutil.copytree(folder, dest)

### Image utils

In [None]:
# cut a single image and output its parts to 'outfolder' 
def cut_image(file, outfolder, stride=STRIDE):
    filename = file.split("/")[-1]
    index = 0
    img = cv2.imread(file, 0)
    height = len(img)
    width = len(img[0])
    for y in range(0, height, stride):
        for x in range(0, width, stride):
            img_out = np.zeros((SIZE,SIZE,1))
            for i in range(SIZE): # y
                for j in range(SIZE): # x
                    if i+y < height and j+x < width:
                        img_out[i,j] = img[i+y,j+x]
            cv2.imwrite("%s/%06d_%s" % (outfolder, index, filename), img_out)
            index += 1

# returns a noised image
def noise(image, var):
    row,col,ch = image.shape
    mean = 0
    gauss = np.random.normal(mean,var,(row,col,ch))
    gauss = gauss.reshape(row,col,ch)
    noisy = image + gauss

    cv2.normalize(noisy, noisy, 0, 255, cv2.NORM_MINMAX, dtype=-1)
    noisy = noisy.astype(np.uint8)
    return noisy

### Image core

In [None]:
def cut_images(stride):
    for file_x in glob.iglob("dataset/input/*.png"):
        filename = file_x.split("/")[-1]
        file_y = "dataset/label/%s" % (filename)

        extend_image(file_x)
        extend_image(file_y)

        cut_image(file_x, "dataset/train-x")
        cut_image(file_y, "dataset/train-y")
    
    for file_x in glob.iglob("dataset/test-input/*.png"):
        filename = file_x.split("/")[-1]
        file_y = "dataset/test-label/%s" % (filename)

        extend_image(file_x)
        extend_image(file_y)
    
    clear_dataset()
    process_label()

def process_label():
    for file in glob.iglob("dataset/train-y/*.png"):
        filename = file.split("/")[-1]
        img = cv2.imread(file, 0)
        for i in range(len(img)):
            for j in range(len(img[i])):
                if(img[i,j] > 0): img[i,j] = 1
                else: img[i,j] = 0

        cv2.imwrite("dataset/train-processed/"+filename, img)

# returns float iou score for a single test file
def get_iou(filename):
    file_out = "out/src/out_%s" % (filename)
    file_target = "dataset/test-label/%s" % (filename)

    if not os.path.isfile(file_out):
        print("[Error/IOU Score] Out file not found")
        return -1
    if not os.path.isfile(file_target):
        print("[Error/IOU Score] Target file not found")
        return -1

    out = cv2.imread(file_out, 0)
    target = cv2.imread(file_target, 0)

    # Evaluates the intersection "area"
    intersect = 0
    for i in range(len(target)):
        for j in range(len(target[i])):
            if(out[i,j] == 255 and target[i,j] == 255):
                intersect += 1
    
    # Evaluates the union "area"
    union = 0
    for i in range(len(target)):
        for j in range(len(target[i])):
            if(out[i,j] == 255 or target[i,j] == 255):
                union += 1

    if union == 0:
        union += 1
        intersect += 1

    iou_score = intersect/union
    return iou_score

### Data Augmentation core

In [None]:
# Niblack's technique to local binarization
def niblack_bin(img, window_size):
    height = img.shape[0]
    width = img.shape[1]
    for y in range(0, height-window_size, window_size):
        for x in range(0, width-window_size, window_size):
            max = -1
            min = -1
            arr = []
            # scan
            for sub_y in range(0, window_size):
                for sub_x in range(0, window_size):
                    if max == -1 or img[y+sub_y, x+sub_x] > max:
                        max = img[y+sub_y, x+sub_x]
                    if min == -1 or img[y+sub_y, x+sub_x] < min:
                        min = img[y+sub_y, x+sub_x]
                    arr.append(img[y+sub_y, x+sub_x])
            local_std = np.std(np.asarray(arr))
            mean = (int(max)+int(min))/2
            local_thresh = mean + (-0.2)*local_std
            # bin
            for sub_y in range(0, window_size):
                for sub_x in range(0, window_size):
                    img[y+sub_y][x+sub_x] = 255 if img[y+sub_y][x+sub_x] > local_thresh else 0
    return img

# Auto-contrast
def auto_contrast(img):
    height = img.shape[0]
    width = img.shape[1]
    max = min = -1
    for y in range(height):
        for x in range(width):
            if max == -1 or img[y, x] > max:
                max = img[y, x]
            if min == -1 or img[y, x] < min:
                min = img[y, x]
    mean = int(max)-int(min)
    if mean == 0: mean = 1
    for y in range(height):
        for x in range(width):
            a = img[y, x]
            b = float(a-min)/mean*255
            img[y, x] = b
    return img

In [None]:
# invert images
def invert_images(folder):
    for file in glob.iglob("%s/*" % (folder)):
        img = cv2.imread(file, 0)
        img = cv2.bitwise_not(img)
        cv2.imwrite(file, img)

# opencv default normalization
def normalize_images(folder):
    for file in glob.iglob("%s/*" % (folder)):
        img = cv2.imread(file)
        cv2.normalize(img, img, 0, 255, cv2.NORM_MINMAX, dtype=-1)
        cv2.imwrite(file, img)

# more light
def whiten_images(folder):
    for file in glob.iglob("%s/*" % (folder)):
        img = cv2.imread(file, 0)
        blank = 15 * np.ones(shape=img.shape, dtype=np.uint8)
        img = img + blank
        height = img.shape[0]
        width = img.shape[1]
        for y in range(height):
            for x in range(width):
                if img[y, x] > 255:
                    img[y, x] = 255
        cv2.imwrite(file, img)

# local binarization techniques (currently only Niblack's method)
def local_bin(folder, window_size):
    count = 0
    ts = [None, None]
    def run(file, window_size):
        img = cv2.imread(file,0)
        img = niblack_bin(img, window_size)
        cv2.imwrite(file, img)
    for file in glob.iglob("%s/*" % (folder)):
        if count == 2:
            ts[0].join()
            ts[1].join()
            count = 0
        ts[count] = threading.Thread(target=run, args=(file, window_size))
        ts[count].start()
        count += 1

# auto contrast images
def contrast_images(folder):
    for file in glob.iglob("%s/*" % (folder)):
        img = cv2.imread(file,0)
        img = auto_contrast(img)
        cv2.imwrite(file, img)

# merges images from folder_1 with the same from folder_2 and puts the output in the folder_1
def merge_folder(folder_1, folder_2):
    for file_img in glob.iglob("%s/*.png" % (folder_1)):
        print(file_img, file_img.replace(folder_1, folder_2))
        img_1 = cv2.imread(file_img)
        img_2 = cv2.imread(file_img.replace(folder_1, folder_2))
        print(img_1.shape, img_2.shape)
        img_out = cv2.addWeighted(img_1,1.0,img_2,0.2,0)
        
        cv2.imwrite(file_img, img_out)

# histogram equalization (modification)
def histogram_eq(folder):
    for file in glob.iglob("%s/*" % (folder)):
        img = cv2.imread(file)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = cv2.equalizeHist(img)
        cv2.imwrite(file, img)

# noise images (modification)
def noise_images(folder, chance, var=5):
    for file in glob.iglob("%s/*" % (folder)):
        if random.randint(0,100) > chance: continue
        img = cv2.imread(file)
        img = noise(img, var)
        cv2.imwrite(file, img)

# noise images (augmentation)
def noise_aug_images(folder, chance, var=5):
    for file_x in glob.iglob("dataset/train-x/*.png"):
        if random.randint(0,100) > chance: continue
        file_y = file_x.replace("train-x", "train-processed")
        
        img = cv2.imread(file_x)
        img = noise(img, var)
        cv2.imwrite(file_x.replace(".png", "noise.png"), img)

        img = cv2.imread(file_y)
        cv2.imwrite(file_y.replace(".png", "noise.png"), img)

# flip images (augmentation)
def flip_images(chance):
    for file_x in glob.iglob("dataset/train-x/*.png"):
        if random.randint(0,100) > chance: continue
        file_y = file_x.replace("train-x", "train-processed")
        
        img = cv2.imread(file_x)
        img = cv2.flip(img, 0)
        cv2.imwrite(file_x.replace(".png", "fp.png"), img)

        img = cv2.imread(file_y)
        img = cv2.flip(img, 0)
        cv2.imwrite(file_y.replace(".png", "fp.png"), img)
        
# rotate images (augmentation)
def rotate_images(chance):
    for file_x in glob.iglob("dataset/train-x/*.png"):
        if random.randint(0,100) > chance: continue
        file_y = file_x.replace("train-x", "train-processed")
        
        img = cv2.imread(file_x)
        img = np.rot90(img)
        img = np.rot90(img)
        cv2.imwrite(file_x.replace(".png", "fp.png"), img)

        img = cv2.imread(file_y)
        img = np.rot90(img)
        img = np.rot90(img)
        cv2.imwrite(file_y.replace(".png", "fp.png"), img)

## Setup - Model

### Model functions

In [None]:
def test_model():
    reset_out()
    
    for file in glob.iglob("dataset/test-input/*.png"):
        cut_image(file, "tmp", stride=SIZE)
        filename = file.split("/")[-1]
        img_original = cv2.imread(file, 0)
        break_width = math.ceil(len(img_original[0])/SIZE) # ceil because there's a padding at the left of the cropped image

        img_in = None
        img_in_x = None
        img_out = None
        img_out_x = None
        current_width = 0
        for file_in in sorted(glob.iglob("tmp/*")):
            model.predict_segmentation(
                inp=file_in,
                out_fname="tmp_out.png"
            )
            temp_in = cv2.imread(file_in, 0)
            temp_out = cv2.imread("tmp_out.png", 0)

            if img_out_x is None:
                img_out_x = temp_out
                img_in_x = temp_in
            else:
                img_out_x = np.append(img_out_x, temp_out, axis=1)
                img_in_x = np.append(img_in_x, temp_in, axis=1)

            current_width += 1

            if current_width == break_width:
                if img_out is None:
                    img_out = img_out_x
                    img_in = img_in_x
                else:
                    img_out = np.append(img_out, img_out_x, axis=0)
                    img_in = np.append(img_in, img_in_x, axis=0)
                img_out_x = None
                img_in_x = None
                current_width = 0
        
        for i in range(len(img_out)):
            for j in range(len(img_out[i])):
                if(img_out[i,j] > 200): img_out[i,j] = 255
                else: img_out[i,j] = 0
            
        os.system("rm tmp/*")

        cv2.imwrite("out/src/out_%s" % (filename), img_out)
        cv2.imwrite("out/src/in_%s" % (filename), img_in)

        process_out(filename)

In [None]:
def process_out(file):
    img_in = cv2.imread("out/src/in_%s" % (file), 1)
    img_out = cv2.imread("out/src/out_%s" % (file), 1)
    img_target = cv2.imread("dataset/test-label/%s" % (file), 1)
    
    # Gets the iou score
    iou_score = get_iou(file)
    if iou_score == -1:
        print("Error while checking iou_score")
        return
    
    # Creates/appends the description file
    create_header = False
    if not os.path.isfile("out/description.csv"):
        create_header = True
    with open("out/description.csv", "a") as desc:
        if create_header:
            desc.write("name,iou\n")
        desc.write("%s,%f\n" % (file, iou_score))

    # Creates the "splitscreen" image
    img = np.append(img_in, img_out, axis=1)
    cv2.imwrite("out/split/%s" % (file), img)

    # Creates the overlayed image (in and out)
    for i in range(len(img_out)):
        for j in range(len(img_out[i])):
            if img_out[i,j][0] == 255:
                img_out[i,j] = [0, 0, 255] # B,G,R
    img = cv2.addWeighted(img_in,1.0,img_out,0.2,0)
    cv2.imwrite("out/%s" % (file), img)

    # Creates the overlayed image (in with out and target)
    for i in range(len(img_target)):
        for j in range(len(img_target[i])):
            if img_target[i,j][0] == 255:
                img_target[i,j] = [255, 0, 0] # B,G,R
    img = cv2.addWeighted(img_in,1,img_out,0.5,0)
    img = cv2.addWeighted(img,1,img_target,0.3,0)
    cv2.imwrite("out/over/%s" % (file), img)

In [None]:
model = eval("keras_segmentation.models." + CURRENT_MODEL)(n_classes=2, input_height=SIZE, input_width=SIZE)

In [None]:
def reset_all():
    model = get_default_model()
    reset_dataset()

In [None]:
def log_out(texts):
    with open("out/description.txt", "w") as file:
        file.write("\n".join(texts))

In [None]:
def train():
    model.train( 
        train_images =  "dataset/train-x/",
        train_annotations = "dataset/train-processed/",
        checkpoints_path = "/tmp/model-ckpt.h5", epochs=15,
        steps_per_epoch=1024
    )

In [None]:
def fetch_model_results():
    results_folder = "%s/resultados_%s" % (WORK_DIR, CURRENT_MODEL.replace(".","_"))
    model_file = "%s/model.csv" % (results_folder)

    if not os.path.isfile(model_file):
        with open(model_file, "w") as file:
            file.write("name,iou,augmentations\n")


    def get_file_lines(file_name):
        lines = []
        with open(file_name, "r") as file:
            lines = file.readlines()
        for i in range(len(lines)):
            lines[i] = lines[i].replace("\n","")
        return lines

    for root, dirs, files in os.walk(results_folder, topdown = False):
        for name in files:
            if name == "description.csv":
                path = os.path.join(root, name)
                
                csv_file = os.path.join(root, name)
                txt_file = os.path.join(root, "description.txt")

                # get the augmentations
                txt_lines = get_file_lines(txt_file)
                augmentations = "/".join(txt_lines)

                # get the csv lines from training and append them with the augmentations
                csv_lines = get_file_lines(csv_file)
                csv_lines.pop(0)
                for i in range(len(csv_lines)):
                    csv_lines[i] = csv_lines[i] + "," + augmentations
                
                # write the results to the main model csv file
                with open(model_file, "a") as file:
                    for l in csv_lines:
                        file.write(l + "\n")
                

In [None]:
def get_default_model():
    model = eval("keras_segmentation.models." + CURRENT_MODEL)(n_classes=2, input_height=SIZE, input_width=SIZE)
    model.load_weights("%s/models/model-%s.h5.0" % (WORK_DIR, CURRENT_MODEL.replace(".","_")))
    return model


In [None]:
def create_default_model():
    # Create default model
    model = eval("keras_segmentation.models." + CURRENT_MODEL)(n_classes=2, input_height=SIZE, input_width=SIZE)
    # Create blank dataset
    img = np.zeros(shape=(SIZE,SIZE,1))
    if not os.path.exists("blank_data_x/"):
        os.mkdir("blank_data_x")
    if not os.path.exists("blank_data_y/"):
        os.mkdir("blank_data_y")
    cv2.imwrite("blank_data_x/img0.png", img)
    cv2.imwrite("blank_data_y/img0.png", img)
    # Train default model
    model.train( 
        train_images =  "blank_data_x/",
        train_annotations = "blank_data_y/",
        checkpoints_path = "%s/models/model-%s.h5" % (WORK_DIR, CURRENT_MODEL.replace(".","_")),
        epochs=1, steps_per_epoch=1
    )
    # Model is already saved

## Functions

### Base

In [None]:
create_dataset()

In [None]:
create_default_model()

### Processes

In [None]:
model = get_default_model()

#### Standard

In [None]:
# standard
reset_all()
train()
test_model()
log_out(["limpo"])
download("out", "limpo")

In [None]:
# standard
reset_all()
normalize_images("dataset/train-x")
normalize_images("dataset/test-input")
train()
test_model()
log_out(["normalizado"])
download("out", "normalizado")

In [None]:
# standard
reset_all()
normalize_images("dataset/test-input")
train()
test_model()
log_out(["teste_normalizado"])
download("out", "teste normalizado")

In [None]:
reset_all()
local_bin("dataset/train-x", window_size=16)
local_bin("dataset/test-input", window_size=16)
train()
test_model()
log_out(["niblack bin k=-0.2"])
download("out", "niblack k=0.2")

In [None]:
reset_all()
os.system("cp -r dataset/train-x dataset/train-x-bin")
os.system("cp -r dataset/test-input dataset/test-input-bin")
local_bin("dataset/train-x-bin", window_size=16)
local_bin("dataset/test-input-bin", window_size=16)
normalize_images("dataset/train-x")
normalize_images("dataset/test-input")
merge_folder("dataset/train-x", "dataset/train-x-bin")
merge_folder("dataset/test-input", "dataset/test-input-bin")
train()
test_model()
log_out(["merge","norm","niblack"])
download("out", "merge norm niblack k=0.2")

In [None]:
reset_all()
contrast_images("dataset/train-x")
contrast_images("dataset/test-input")
train()
test_model()
log_out(["auto contrast"])
download("out", "contrast")

In [None]:
reset_all()
histogram_eq("dataset/train-x")
histogram_eq("dataset/test-input")
train()
test_model()
log_out(["hist. eq."])
download("out", "hist eq")

In [None]:
reset_all()
normalize_images("dataset/train-x")
normalize_images("dataset/test-input")
noise_images("dataset/train-x", 50, var=5)
train()
test_model()
log_out(["normalizado", "ruido (aug) 50% σ²=5"])
download("out", "normalizado ruido-50-5")

In [None]:
reset_all()
normalize_images("dataset/train-x")
normalize_images("dataset/test-input")
noise_aug_images("dataset/train-x", 50, var=5)
train()
test_model()
log_out(["normalizado", "ruido (aug) 50% σ²=5"])
download("out", "normalizado ruidoaug-50-5")

In [None]:
reset_all()
normalize_images("dataset/train-x")
normalize_images("dataset/test-input")
flip_images(50)
train()
test_model()
log_out(["normalizado", "espelhado 50%"])
download("out", "normalizado espelhado-50")

In [None]:
reset_all()
normalize_images("dataset/train-x")
normalize_images("dataset/test-input")
rotate_images(50)
train()
test_model()
log_out(["normalizado", "rotacionado 50%"])
download("out", "normalizado rotacionado-50")

In [None]:
fetch_model_results()

# Data Analysis

###

In [None]:
develop_dataset()