# Notebook to compare and explore raw images, model masks and manual masks

In this notebook, we display a grid of all uploaded images. Each image group consists of:
* **raw image**: the original art object's image
    * Expected: image with an image extension (i.e. `.png`, `.jpeg` or `.jpg`)  (inside `./data/raw_images/`)
* **model mask**: the model mask outputed by the Deeplab v3+ model
    * Expected: image with the same name as the raw image and file extension (inside `./data/model_masks/`)
* **manual mask**: the manual mask drawn by one of our team members
    * Expected: image with the same name as the raw image and file extension (inside `./data/manual_masks/`)
* **metrics' object**: the dict object containing the metrics.
    * Expected: *.JSON* file with the same name as the raw image containing only top-level keys. The key is the metric's name and the value is the performance value of the metric.

If there is a missing image, mask, metrics' object, etc. a "missing file" default image will be shown. Therefore, you don't need to provide everything for each raw image to start experimenting!

There is a set of 101 images already prepared by Matt and it's available in the GitHub repo as a release. The cells below download, unzip and save this dataset inside `./data/raw_images/` and this serves as the common basis for everything else.

You can also skip the cells downloading the data and place your own set of images inside `./data/raw_images/` and experiment from there.

**When a session ends, the uploaded files are deleted, so this is not persistent storage!**

Good luck!

### Imports

In [None]:
%pylab inline
import math
import os
import json
import shutil
from typing import Tuple, List
import requests, zipfile, io

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
from PIL import Image, ImageDraw

### Constants and paths

In [None]:
RAW_IMAGES_PATH = './data/raw_images/'
MODEL_MASKS_PATH = './data/model_masks/'
MANUAL_MASKS_PATH = './data/manual_masks/'
METRICS_PATH = './data/metrics/'
ENSEMBLE_MASKS_PATH = './data/ensemble_masks/'

IMAGE_FILE_EXTENSIONS = ('png', 'jpeg', 'jpg')

RAW_IMAGES_ZIP_URL = "https://github.com/rijks-g4/rijksdraw/releases/download/v0.0.1/initial_dataset.zip"
MANUAL_MASKS_ZIP_URL = "https://github.com/rijks-g4/rijksdraw/releases/download/v0.0.1/manual_masks.zip"
MODEL_MASKS_ZIP_URL = "https://github.com/rijks-g4/rijksdraw/releases/download/v0.0.1/masks.zip"

MISSING_FILE_URL = 'https://cdn1.iconfinder.com/data/icons/leto-files/64/leto_files-58-512.png'

### Create required directories

In [None]:
os.makedirs(RAW_IMAGES_PATH, exist_ok=True)
os.makedirs(MODEL_MASKS_PATH, exist_ok=True)
os.makedirs(MANUAL_MASKS_PATH, exist_ok=True)
os.makedirs(METRICS_PATH, exist_ok=True)

### Download, unzip and save the raw images, manual masks and model masks

In [None]:
def download_and_extract_nested_zip(zip_url: str, output_directory: str, nest_directory_name: str):
    r = requests.get(zip_url)
    with zipfile.ZipFile(io.BytesIO(r.content)) as zip_file:
        for member in zip_file.namelist():
            filename = os.path.basename(member)
            # skip directories
            if not filename or member != f'{nest_directory_name}/{filename}':
                continue

            # copy file (taken from zipfile's extract)
            source = zip_file.open(member)
            target = open(os.path.join(output_directory, filename), "wb")
            with source, target:
                shutil.copyfileobj(source, target)

def download_and_extract_masks(zip_url: str, output_directory: str, nest_directory_name: str):
    r = requests.get(zip_url)
    with zipfile.ZipFile(io.BytesIO(r.content)) as zip_file:
        for member in zip_file.namelist():
            filename = os.path.basename(member)
            output_path = member.replace(f'{nest_directory_name}/', "", 1)
            output_path = os.path.join(output_directory, output_path)
            # print(filename, member)
            # skip directories
            if not filename:
                if output_path:
                    os.makedirs(output_path, exist_ok = True)
                continue            

            # copy file (taken from zipfile's extract)
            source = zip_file.open(member)
            target = open(output_path, "wb")
            with source, target:
                shutil.copyfileobj(source, target)

In [None]:
download_and_extract_nested_zip(RAW_IMAGES_ZIP_URL, RAW_IMAGES_PATH, 'initial_dataset')

In [None]:
download_and_extract_nested_zip(MANUAL_MASKS_ZIP_URL, MANUAL_MASKS_PATH, 'manual_masks')

In [None]:
download_and_extract_masks(MODEL_MASKS_ZIP_URL, MODEL_MASKS_PATH, 'masks')

# Before continuing, please:
* Check whether the stats below (you need to run them) show the correct number of objects for each directory.

### Show stats of read images and metrics' objects

In [None]:
def get_all_files(directory: str):
    return [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]

def get_all_dirs(directory: str):
    return [f for f in os.listdir(directory) if os.path.isdir(os.path.join(directory, f))]

In [None]:
MODEL_NAMES = get_all_dirs(MODEL_MASKS_PATH)

In [None]:
raw_images = [f for f in get_all_files(RAW_IMAGES_PATH) if f.endswith(IMAGE_FILE_EXTENSIONS)]
print(f"Found {len(raw_images)} raw images.")

for model in get_all_dirs(MODEL_MASKS_PATH):
    model_masks = get_all_files(os.path.join(MODEL_MASKS_PATH, model))
    print(f"Found {len(model_masks)} model masks for model: {model}.")

manual_masks = get_all_files(MANUAL_MASKS_PATH)
print(f"Found {len(manual_masks)} manual masks.")

metrics_objects = get_all_files(METRICS_PATH)
print(f"Found {len(metrics_objects)} metrics' objects.")

### Functions to read masks

In [None]:
def manual_mask_exists(img_filename: str) -> bool:
    manual_mask_filename = f"{img_filename.split('.')[0]}.json"
    return manual_mask_filename in manual_masks

def read_manual_mask(mask_directory: str, img_filename: str):
    mask_path = f"{img_filename.split('.')[0]}.json"
    try:
        with open(os.path.join(mask_directory, mask_path)) as json_file:
            mask = json.load(json_file)
            width = mask['imageWidth']
            height = mask['imageHeight']

            img = Image.new('L', (width, height), 0)

            for shape in mask['shapes']:
                ImageDraw.Draw(img).polygon([(point[0], point[1]) for point in shape['points']], outline=1, fill=1)

            return (mask_path, np.array(img))
    except FileNotFoundError as e:
        return (mask_path, None)

def read_model_mask(model_name: str, image_name: str):
    mask_filename = f"{image_name.split('.')[0]}.npy"
    full_mask_path = os.path.join(MODEL_MASKS_PATH, model_name, mask_filename)

    mask = np.load(full_mask_path)

    if model_name in ['mask_rcnn_coco']:
        mask = np.logical_or.reduce(mask, axis=2).astype(int)

    mask[mask == 255] = 1

    return mask_filename, mask

def read_image(img_path: str):
    filename = os.path.basename(img_path)
    try:
        return (filename, mpimg.imread(img_path))
    except FileNotFoundError as e:
        return (filename, None)

### Read manual masks

In [None]:
raw_images_with_manual_masks = [
    raw_image for raw_image in raw_images if manual_mask_exists(raw_image)
]

print(raw_images_with_manual_masks)

### Functions for evaluations

In [None]:
from collections import defaultdict

def iou_score(target, prediction):
    intersection = np.logical_and(target, prediction)
    union = np.logical_or(target, prediction)
    return np.sum(intersection.astype(int)) / np.sum(union.astype(int))

def pixel_accuracy_score(target, prediction):
    accurate_pixels = np.logical_or(np.logical_and(target, prediction), np.logical_and(np.logical_not(target), np.logical_not(prediction)))
    all_pixels = np.logical_or(target, np.logical_not(target))
    return np.sum(accurate_pixels.astype(int)) / np.sum(all_pixels.astype(int))

def dice_coefficient_score(target, prediction):
    intersection = np.logical_and(target, prediction)
    return 2 * np.sum(intersection.astype(int)) / (np.sum(target.astype(int)) + np.sum(prediction.astype(int)))

def calculate_metrics(manual_mask, model_mask):
    metrics = dict()

    metrics['Dice Coefficient'] = dice_coefficient_score(manual_mask, model_mask)
    metrics['IoU Score'] = iou_score(manual_mask, model_mask)
    metrics['PA Score'] = pixel_accuracy_score(manual_mask, model_mask)

    return metrics

def metrics_average(metrics):
    average = defaultdict(dict)

    for key in metrics[0]:
        for metric in metrics[0][key]:
            average[key][metric] = dict()
            all_scores = [metrics[i][key][metric] for i in range(len(metrics))]
            average[key][metric]['mean'] = sum(all_scores) / len(all_scores)
            average[key][metric]['min'] = min(all_scores)
            average[key][metric]['max'] = max(all_scores)

    return average

def print_metrics_overview(metrics):
    print("\nAverage Metrics (Min, Mean, Max):\n")
    average = metrics_average(metrics)
    for key in average:
        for metric, value in average[key].items():
            print(f"{key}, {metric}: {round(value['min'], 2)}, {round(value['mean'], 2)}, {round(value['max'], 2)}")
    print("")

### Ensemble

In [None]:
def ensemble_sum_mask(masks: list, threshold: float = None):
    ensemble = masks[0]

    for mask in masks[1:]:
        ensemble = np.add(ensemble, mask)

    if threshold is not None:
        ensemble = np.where(ensemble >= threshold, ensemble, 0)

    return ensemble

### Show a grid of raw images, model masks, ground truth (i.e. manually drawn masks) and metrics

In [None]:
from typing import List, Tuple

def show_groups_galery(
    data: List[Tuple],
    fig_size: Tuple[int, int],
    col_headers: List[str] = None,
    row_headers: List[str] = None,
    metrics: List[str] = None) -> None:
    # Basic stats
    n_rows = len(data)
    n_cols = len(data[0]) if metrics is None else len(data[0]) + 1
    print(f"Showing {n_rows * n_cols} images...")
    print(f"Columns: {n_cols}")
    print(f"Rows : {n_rows}")

    # Create galery figure
    fig, axs = plt.subplots(n_rows, n_cols, figsize=(fig_size[0] * n_cols, fig_size[1] * n_rows))
    fig.tight_layout()

    if col_headers:
        # Add col headers
        first_col = axs[:,0] if n_rows > 1 else axs
        for ax, row in zip(first_col, col_headers):
            ax.annotate(row, xy=(0, 0.5), xytext=(-ax.yaxis.labelpad - 5, 0),
                        xycoords=ax.yaxis.label, textcoords='offset points',
                        size='xx-large', ha='right', va='center')

    if row_headers:
        # Add row headers
        first_row = axs[0] if n_rows > 1 else axs
        for ax, col in zip(first_row, row_headers):
            ax.annotate(col, xy=(0.5, 1), xytext=(0, 30),
                        xycoords='axes fraction', textcoords='offset points',
                        size='large', ha='center', va='baseline')

    # Show each group of images
    for row_id, data_group in enumerate(data):
        for col_id, (image_name, image) in enumerate(data_group):
            ax = axs[row_id, col_id] if n_rows > 1 else axs[col_id]
            ax.set_title(image_name)
            if image is not None:
                ax.imshow(image)

        # metrics_path = metrics_paths[row_id]
        metrics_object = metrics[row_id]
        ax = axs[row_id, n_cols - 1]
        # ax.set_title(os.path.basename(metrics_path))

        try:
            # with open(metrics_path) as json_file:
                # metrics = json.load(json_file)
            ax.axis([0, 10, 0, 10])
            for idx, model in enumerate(metrics_object):
                for idx2, (metric_name, value) in enumerate(metrics_object[model].items()):
                    ax.text(1, 10 - ((((idx * 3) + idx2) * 0.3) + 1),
                            f"{model} - {metric_name}: {round(value, 2)}",
                            fontsize=15, style='italic')
        except FileNotFoundError as e:
            pass
            # Show missing file image if metrics' file wasn't found
            # image = mpimg.imread(MISSING_FILE_URL)
            # ax.imshow(image)

def show_galery_from_paths(
    data: List[List[str]],
    fig_size: Tuple[int, int],
    col_headers: List[str] = None,
    row_headers: List[str] = None,
    metrics_paths: str = None) -> None:
    
    images = list()
    
    for data_group in data:
        images_group = list()
        for image_path in data_group:
            filename = os.path.basename(image_path)
            try:
                images_group.append((
                    filename,
                    mpimg.imread(image_path)
                ))
            except FileNotFoundError as e:
                images_group.append((filename, None))
        images.append(images_group)

    show_groups_galery(
        images,
        fig_size,
        col_headers,
        row_headers,
        metrics_paths,
    )

In [None]:
# Set the galery size. First value is the column width, second value is the row height.
FIG_SIZE = (9, 8)

image_sets = list()
metrics = list()

# Swap the two lines below if you want to experiment with only the first X images.
for raw_image_name in raw_images_with_manual_masks:
# for raw_image_name in raw_images:
    raw_image_object = read_image(os.path.join(RAW_IMAGES_PATH, raw_image_name))
    manual_mask_object = read_manual_mask(MANUAL_MASKS_PATH, raw_image_name)

    # models = list()
    metrics_object = defaultdict(dict)
    model_mask_objects = list()

    for model in MODEL_NAMES:
        if model == 'fcn_resnet101_masks':
            continue

        model_mask_object = read_model_mask(model, raw_image_name)

        # models.append(model)
        model_mask_objects.append(model_mask_object)
        metrics_object[model] = calculate_metrics(manual_mask_object[1], model_mask_object[1])

    # ensemble_models = list()
    # ensemble_mask_objects = list()

    for threshold in range(1, 5):
        ensemble_name = f'Sum ensemble: {threshold}'
        sum_ensemble = (
            ensemble_name,
            ensemble_sum_mask([m[1] for m in model_mask_objects], threshold = threshold)
        )

        # ensemble_models.append(ensemble_name)
        # ensemble_mask_objects.append(sum_ensemble)
        metrics_object[ensemble_name] = calculate_metrics(manual_mask_object[1], sum_ensemble[1])

    # models.extend(ensemble_models)
    # model_mask_objects.extend(ensemble_mask_objects)

    # image_sets.append(
    #     (
    #         raw_image_object,
    #         manual_mask_object,
    #         *model_mask_objects,
    #     )
    # )
    metrics.append(metrics_object)
    # metrics.append(os.path.join(METRICS_PATH, f"{raw_image_name.split('.')[0]}.json"))

print(f"Created {len(image_sets)} image sets.")

# print_metrics_overview(metrics)

# show_groups_galery(image_sets, FIG_SIZE, None, ['Raw', 'Manual Mask', *models, 'Metrics'], metrics)

In [None]:
# m_average = metrics_average(metrics)

# with open('/content/data/average_metrics.json', 'w') as f:
#     json.dump(m_average, f)

with open('/content/data/metrics.json', 'w') as f:
    metrics_res = dict()

    for (raw_image_name, metric) in zip(raw_images_with_manual_masks, metrics):
        object_number = raw_image_name.split('.')[0]
        metrics_res[object_number] = metric

    json.dump(metrics_res, f)

In [None]:
print_metrics_overview(metrics)

fig = plt.figure(figsize =(10, 7))
ax = fig.add_subplot(111)

metrics_keys = []
metrics_list = []
for idx, key in enumerate(metrics[0]):
    metric_values = []
    for metric_object in metrics:
        metric_values.append(metric_object[key])
    metrics_list.append(metric_values)
    metrics_keys.append(key)

# Creating plot
plt.boxplot(metrics_list, vert = False )

ax.set(xlim=(0, 1))
ax.set_yticklabels(metrics_keys)
 
# show plot
plt.show()

In [None]:
#Ensemble - Matt & Rutger
FIG_SIZE = (9, 8)

image_sets = list()
metrics = list()

for raw_image_name in ["unnamed-22.jpg","unnamed-25.jpg", "unnamed-26.jpg", "unnamed-27.jpg", "unnamed-28.jpg"]: #smaller sample for testing purposes 
    raw_image_object = read_image(os.path.join(RAW_IMAGES_PATH, raw_image_name))
    manual_mask_object = read_manual_mask(MANUAL_MASKS_PATH, raw_image_name)
    model_mask_object = read_model_mask(MODEL_MASKS_PATH, raw_image_name, os.path.join(RAW_IMAGES_PATH, raw_image_name))
    segformer_mask_object = read_segformer_mask(SEGFORMER_MASKS_PATH, raw_image_name)
    unet_mask_object = read_unet_mask(UNET_MASKS_PATH, raw_image_name)
    ensemble_mask_object = read_ensemble_mask(model_mask_object[1], segformer_mask_object[1], unet_mask_object[1], threshold=2, noise=False)
    ensemble_mask_object_2 = read_ensemble_mask(model_mask_object[1], segformer_mask_object[1], unet_mask_object[1], threshold=3, noise=False)

    ensemble_mask_object_2

    with open(os.path.join(ENSEMBLE_MASKS_PATH, f'{raw_image_name[:-4]}.npy'), 'wb') as f:
        np.save(f, ensemble_mask_object_2)

    image_sets.append(
        (
            raw_image_object,
            (raw_image_name, ensemble_mask_object),
            (raw_image_name, ensemble_mask_object_2),
        )
    )
    metrics.append({
        'Ens-1 - IoU Score': iou_score(manual_mask_object[1], ensemble_mask_object),
        'Ens-1 - PA Score': pixel_accuracy_score(manual_mask_object[1], ensemble_mask_object),
        'Ens-1 - Dice Coefficient': dice_coefficient_score(manual_mask_object[1], ensemble_mask_object),
        'Ens-2 - IoU Score': iou_score(manual_mask_object[1], ensemble_mask_object_2),
        'Ens-2 - PA Score': pixel_accuracy_score(manual_mask_object[1], ensemble_mask_object_2),
        'Ens-2 - Dice Coefficient': dice_coefficient_score(manual_mask_object[1], ensemble_mask_object_2),
    })
    # metrics.append(os.path.join(METRICS_PATH, f"{raw_image_name.split('.')[0]}.json"))

print(f"Created {len(image_sets)} image sets.")

print_metrics_overview(metrics)

show_groups_galery(image_sets, FIG_SIZE, None, ['Raw','Ens-1', 'Ens-2', 'Metrics'], metrics)

## OLD EXPERIMENTS

In [None]:
# RUTGER TEST BLOCK
m1 = np.array([[0,0,255],
               [255,255,0],
               [0,255,255]])

m2 = np.array([[2,2,2],
               [2,2,0],
               [0,0,0]])

masks = [m1, m2]

wm = weighted_mask(masks)
vote_mask(wm, 2)

In [None]:
def read_ensemble_mask(model_mask_object, segformer_mask_object, unet_mask_object, noise, threshold):

    m1=segformer_mask_object ##255 for something, 0 for nothing
    for i in range(len(m1)):
      for j in range(len(m1[i])):
        if m1[i][j] == 255:
         m1[i][j] = 1
    m2=model_mask_object ##1 for something, 0 for nothing
    m3=unet_mask_object ##1 for something, 0 for nothing
    m4=np.add(m1,m2)
    m5=np.add(m3,m4)

    if noise==False:
     for i in range(len(m5)):
       for j in range(len(m5[i])):
         if m5[i][j] >= threshold:
           m5[i][j] = 1
         elif m5[i][j] < threshold:
           m5[i][j] = 0

    return m5

In [None]:
def scale(im, nR, nC):
  nR0 = len(im)     # source number of rows 
  nC0 = len(im[0])  # source number of columns 
  return [[ im[int(nR0 * r / nR)][int(nC0 * c / nC)]  
             for c in range(nC)] for r in range(nR)]

def weighted_mask(masks):
    weighted = np.zeros(np.shape(masks[0]))
    for mask in masks:
        mask = mask/mask
        mask[np.isnan(mask)] = 0
        weighted += mask
    return weighted

def vote_mask(weighted_mask, threshold):
    weighted_mask[weighted_mask < threshold] = 0
    weighted_mask[weighted_mask >= threshold] = 1
    return weighted_mask

In [None]:
# ensemble_mask_object = read_ensemble_mask(model_mask_object[1], segformer_mask_object[1], unet_mask_object[1], threshold=2, noise=False)
# ensemble_mask_object_2 = read_ensemble_mask(model_mask_object[1], segformer_mask_object[1], unet_mask_object[1], threshold=3, noise=False)

# weighted_mask_object = weighted_mask([m[1] for m in model_mask_objects]) 
# vote_mask_object = vote_mask(weighted_mask_object, threshold=3)

In [None]:
#manually drawn mask
sample_manual_mask = read_manual_mask(MANUAL_MASKS_PATH, raw_images_with_manual_masks[0])
plt.imshow(sample_manual_mask[1])

#mask output from model-1
sample_segformer_mask_name, sample_segformer_mask = read_model_mask(
    'segformer_masks',
    raw_images_with_manual_masks[0]
)
plt.imshow(sample_segformer_mask)

#mask output from model-2
sample_model_mask = read_model_mask('mask_rcnn_coco', raw_images_with_manual_masks[0])
plt.imshow(sample_model_mask[1])

#mask output from model-3
sample_unet_mask_name, sample_unet_mask = read_model_mask('unet', raw_images_with_manual_masks[0])
plt.imshow(sample_unet_mask)

sample_metrics = {
    'IoU Score': iou_score(sample_manual_mask[1], sample_model_mask[1]),
    'PA Score': pixel_accuracy_score(sample_manual_mask[1], sample_model_mask[1]),
    'Dice Coefficient': dice_coefficient_score(sample_manual_mask[1], sample_model_mask[1])
}

print('Metrics:')
print(sample_metrics)

In [None]:
e = ensemble_sum_mask([sample_model_mask[1], sample_segformer_mask], threshold=2)
plt.imshow(e)

In [None]:
plt.imshow(sum_ensemble[1])

In [None]:
wow = list()

for threshold in range(1, 2):
    ensemble_name = f'Sum ensemble: {threshold}'
    sum_ensemble = (
        ensemble_name,
        ensemble_sum_mask([m[1] for m in model_mask_objects], threshold = threshold)
    )
    wow.append(sum_ensemble)

In [None]:
e = wow[0][1]

In [None]:
len(e[e > 2])

In [None]:
ms = [m for m in model_mask_objects]

In [None]:
for m in model_mask_objects:
    print(m[0])
    m = m[1]
    print(m[m > 1])