# This notebook serves to train and benchmark models on a dataset annotated by label-studio 
This notebook is made to work on GPU, created on kaggle

In [None]:
!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'

In [None]:
import torch, detectron2
!nvcc --version
TORCH_VERSION = ".".join(torch.__version__.split(".")[:2])
CUDA_VERSION = torch.__version__.split("+")[-1]
print("torch: ", TORCH_VERSION, "; cuda: ", CUDA_VERSION)
print("detectron2:", detectron2.__version__)

In [None]:
from detectron2.config import get_cfg
from detectron2.data import DatasetCatalog, MetadataCatalog, build_detection_train_loader
from detectron2.data.datasets import register_coco_instances
from detectron2.engine import DefaultTrainer
from detectron2.model_zoo import model_zoo
from detectron2.utils.visualizer import Visualizer, ColorMode
from detectron2.structures import BoxMode
from io import BytesIO
import os
import cv2
import json
import numpy as np
import matplotlib.pyplot as plt
import random
import PIL.Image
import requests
from detectron2.evaluation import COCOEvaluator, inference_on_dataset
from detectron2.data import build_detection_test_loader

In the following cellule you have to choose the path for your dataset.  
the dataset has to contain a .json with the annotations and an "image" folder with all the images named after their id.

In [None]:
#MappingLabelStudio créer une instance à partir du fichier .json générer par LabelStudio
dataset_path = "/kaggle/input/galica"

json_file = dataset_path + "/galica/gallicaimages_set1.json"
image_dir =  dataset_path + "/image"

#list des categories sur lesquels on souhaite travailler
category_dict = {
  'tampon': 0,
  'écriture manuscrite': 1,
  'écriture typographique': 2,
  'photographie': 3,
  'estampe': 4,
  'décoration' : 5
}

# Data preparation :  this section will do all the premilimnary work to adapt the data for the training

In [None]:
def update_image_sizes(json_file, image_dir, out):
    with open(json_file, encoding='utf-8') as f:
        data = json.load(f)

    for image in data['images']:
        image_id = image['id']
        image_path = image_dir + '/' + image_id + '.jpg'  # Chemin complet de l'image

        # Ouvrir l'image avec PIL
        img = PIL.Image.open(image_path)
        width, height = img.size

        # Mettre à jour les informations de taille dans le JSON
        image['width'] = width
        image['height'] = height

    # Écrire le JSON mis à jour dans un nouveau fichier
    with open(out, 'w') as outfile:
        json.dump(data, outfile)

    print("Mise à jour des tailles d'image terminée.")
    print("JSON mis à jour enregistré dans :", "gallica_dataset_file.json")
    
update_image_sizes(json_file, image_dir, "gallica_dataset_file.json")

In [None]:
gallica_json = "/kaggle/working/gallica_dataset_file_all.json"

In [None]:
import random

def get_my_dataset_dicts(json_file, category_dict, image_dir):
    """
        from gallica Label Studio to COCO dataset json
    """
    with open(json_file, encoding='utf-8') as f:
        data = json.load(f)

    dataset_dicts = []
    for idx, image in enumerate(data['images']):
        record = {}
        # Create file path from local directory and image id
        filename = os.path.join(image_dir, image['id'] + '.jpg')

        # Assign image properties
        record["file_name"] = filename
        record["height"] = image["height"] 
        record["width"] = image["width"]
        record["image_id"] = image['id']

        annotations = []
        # Assign annotations to image
        for ann in data["annotations"]:
            for result in ann["result"]:
                if ann["id"] == image["id"] and type(result["label"]) is not int and result["label"][0] in category_dict.keys():
                # Create a new dict for each annotation in the image
                    x = int(result['bbox']['x'] / 100.0 * image["width"])
                    y = int(result['bbox']['y'] / 100.0 * image["height"])
                    width = int(result['bbox']['width'] / 100.0 * image["width"])
                    height = int(result['bbox']['height'] / 100.0 * image["height"])
                    obj = {
                        "bbox": [x, y, width, height],
                        "bbox_mode": BoxMode.XYWH_ABS,  # as your bounding box coordinates are in absolute format
                        "category_id": category_dict[result["label"][0]],  # map your label name to its corresponding id
#                         "id": result["id"],
#                         "iscrowd": 0,
#                         "image_id":ann["id"],
                    }
                    annotations.append(obj)
        record["annotations"] = annotations
        dataset_dicts.append(record)
    return dataset_dicts

In [None]:

def remove_all_datasets():
    registered_datasets = list(DatasetCatalog.list())
    for dataset_name in registered_datasets:
        DatasetCatalog.remove(dataset_name)
remove_all_datasets()

In [None]:
# PRINT DATASET

def print_coco_dataset(dataset):
    my_dataset_metadata = MetadataCatalog.get(dataset)

    # Get your dataset in Detectron2's format
    dataset_dicts = DatasetCatalog.get(dataset)

    for d in random.sample(dataset_dicts, 3):
        # Open the image file
        img = PIL.Image.open(d["file_name"])
        img = np.array(img)

        # Handle grayscale images:
        if len(img.shape) == 2:
            img = np.stack([img] * 3, axis=-1)

        # Create a visualizer instance
        visualizer = Visualizer(img[:, :, ::-1], metadata=my_dataset_metadata, scale=0.5)

        # Draw the predictions on the image
        vis = visualizer.draw_dataset_dict(d)

        # Show the image using matplotlib
        plt.imshow(vis.get_image()[:, :, ::-1])
        plt.axis('off')
        plt.show()


In [None]:
def split_data(json_file, category_dict, image_dir, split='train'):
    # Load json file
    """
        Divide original dataset in train , val , test
    """
    with open(json_file) as f:
        data = json.load(f)

    classes_count = np.zeros(len(category_dict))
    # Convert to Detectron2 format
    dataset_dicts = get_my_dataset_dicts(json_file, category_dict, image_dir)
    print(len(dataset_dicts))
    
    #get total number of each classes
    for data in dataset_dicts:        
        for annotation in data["annotations"]:
            classes_count[annotation["category_id"]] += 1
    
    data_train = []
    data_val = []
    data_test = []
    data_lost = []
    nb_image = 0
    count = np.zeros(len(category_dict))
    classes_count_sorted = np.argsort(classes_count)
    for data in dataset_dicts:
        isLost = True
        for annotation in data["annotations"]:
            v = False
            for i in range(len(category_dict)):
                if count[annotation["category_id"]] < classes_count[classes_count_sorted[i]]*0.83:
                    data_train.append(data)
                    for annotation in data["annotations"]:
                        count[annotation["category_id"]] += 1
                    nb_image+=1
                    v=True
                    break
                elif count[annotation["category_id"]] < classes_count[classes_count_sorted[i]]*0.95:
                    data_val.append(data)
                    for annotation in data["annotations"]:
                        count[annotation["category_id"]] += 1
                    nb_image+=1
                    v=True
                    break
                elif count[annotation["category_id"]] < classes_count[classes_count_sorted[i]]:
                    data_test.append(data)
                    for annotation in data["annotations"]:
                        count[annotation["category_id"]] += 1
                    nb_image+=1
                    v=True
                    break
            if v:
                isLost = False
                break
        if isLost:
            data_lost.append(data)
            
    #append lost data to test data
    for data in data_lost:
        data_test.append(data)
                        
    print("count: ", count)
    print("classes_count: ", classes_count)
    print("nb_image: ", nb_image)
    print("data_lost", len(data_lost))
    
    return data_train, data_val, data_test
    
datasets = split_data(gallica_json, category_dict, image_dir)

This cellule will give you a .rar containing all the test images so you can choose visually some cases that you would try, we will see later how to try these images.

In [None]:
import os
import shutil

test_images = []
for i in range(len(datasets[2])):
    test_images.append(datasets[2][i]["image_id"])


# Define source and destination directories
image_dir = dataset_path + "/image"  # Your source directory
destination_dir = "/kaggle/working/test_images"  # Replace with your destination directory

# Create the destination directory if it doesn't exist
if not os.path.exists(destination_dir):
    os.makedirs(destination_dir)

# Iterate through the list of image filenames and copy them to the destination directory
for image_filename in test_images:
    # Add the .jpg extension to the filenames
    image_filename_with_extension = image_filename + ".jpg"
    
    source_path = os.path.join(image_dir, image_filename_with_extension)
    destination_path = os.path.join(destination_dir, image_filename_with_extension)
    
    try:
        shutil.copy(source_path, destination_path)
        print(f"Successfully copied {image_filename} to {destination_dir}")
    except FileNotFoundError:
        print(f"File not found: {image_filename}")
    except FileExistsError:
        print(f"File already exists in the destination directory: {image_filename}")

In [None]:
zip_file_name = "images.zip"

# Zip the destination directory
shutil.make_archive(destination_dir, 'zip', destination_dir)

# Rename the generated archive to the desired name
os.rename(destination_dir + '.zip', zip_file_name)
print(f"Successfully zipped the directory as {zip_file_name}")

In [None]:
def get_dataset_dicts_split(datasets, split='train'):
    # Load json file
    """
        Divide original dataset in train , val , test
    """
    
    # Shuffle data
    random.shuffle(datasets[0])
    random.shuffle(datasets[1])
    random.shuffle(datasets[2])

    if split == 'train':
        return datasets[0]
    elif split == 'val':
        return datasets[1]
    elif split == 'test':
        return datasets[2]

In [None]:
remove_all_datasets()

for d in ["train", "val", "test"]:
    DatasetCatalog.register("my_dataset_" + d, lambda d=d: get_dataset_dicts_split(datasets, d))
    MetadataCatalog.get("my_dataset_" + d).set(thing_classes=list(category_dict.keys()))

# Data visualization  
Displaying a few images from the validation set

In [None]:
print_coco_dataset("my_dataset_val")

# This is the training section with default values  
In the below cellule you can modify with your own training parameters

In [None]:
from detectron2.engine import DefaultTrainer
from detectron2.config import get_cfg

In [None]:
cfg = get_cfg()


# Specify the model to use
cfg.merge_from_file("/kaggle/input/modeldata/config.yml")

# Override the dataset and solver settings
cfg.DATASETS.TRAIN = ("my_dataset_train",)
cfg.DATASETS.TEST = ("my_dataset_val",)
cfg.DATALOADER.NUM_WORKERS = 2
cfg.MODEL.WEIGHTS = "/kaggle/input/modeldata/model_final.pth"
cfg.SOLVER.IMS_PER_BATCH = 2
cfg.SOLVER.BASE_LR = 0.0025
cfg.SOLVER.MAX_ITER = 1000 
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128
cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(category_dict)

In [None]:
import time

os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)
trainer = DefaultTrainer(cfg)
trainer.resume_or_load(resume=False)

# Measure the training time
start_time = time.time()
trainer.train()
end_time = time.time()

# Calculate the elapsed time
elapsed_time = end_time - start_time

print("Training time: {:.4f} seconds".format(elapsed_time))

In [None]:
from detectron2.evaluation import COCOEvaluator, inference_on_dataset, COCOPanopticEvaluator
from detectron2.data import build_detection_test_loader
from detectron2.data import MetadataCatalog

# Get the metadata for your dataset
my_dataset_metadata = MetadataCatalog.get("my_dataset_val")

# Create an evaluator for the validation set
evaluator = COCOEvaluator("my_dataset_val", output_dir="./output")

# Create a test data loader
test_loader = build_detection_test_loader(cfg, "my_dataset_val")

# Perform inference on the validation set
try:
    results = inference_on_dataset(trainer.model, test_loader, evaluator)
except Exception as e:
    # Print the error message
    print("Error:", str(e))

In [None]:
from detectron2.utils.visualizer import ColorMode
from detectron2.engine import DefaultPredictor


#Use the final weights generated after successful training for inference  
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")

cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.8  # set the testing threshold for this model
#Pass the validation dataset
cfg.DATASETS.TEST = ("my_dataset_val", )

predictor = DefaultPredictor(cfg)

In [None]:
from detectron2.checkpoint import DetectionCheckpointer, Checkpointer

checkpointer = DetectionCheckpointer(trainer.model, save_dir=cfg.OUTPUT_DIR)
checkpointer.save("my_model")

# This section allow you to visualize the performance of your model on images  
in this cellule you can add the file names of the images selected earlier in the .rar to see how the model predict on them

In [None]:
dataset_dicts = DatasetCatalog.get("my_dataset_test")
metadata = MetadataCatalog.get("my_dataset_test")
file_names = ["5f84a46e-a568-462f-8476-35866b41d2ff",
"6a6925ac-5ad8-4829-a88f-0109293e3cd3",
"13a8a57e-3017-497f-a3f3-0dd15abdf70f",
"67a07b78-f7cd-4768-9b12-b65dbc4a396c",
"a5aab339-5d30-405a-bc58-c396bc46002d"]

dataset_test = []
for data in dataset_dicts:
    if data["image_id"] in file_names:
        dataset_test.append(data)

for d in dataset_test:
    im = cv2.imread(d["file_name"])

    # Make predictions
    outputs = predictor(im)

    # Visualize both ground truth and predictions
    visualizer_pred = Visualizer(
        im[:, :, ::-1], metadata=metadata, scale=0.8, instance_mode=ColorMode.IMAGE
    )
    visualizer_gt = Visualizer(
        im[:, :, ::-1], metadata=metadata, scale=0.8, instance_mode=ColorMode.IMAGE
    )


    # Draw ground truth bounding boxes
    gt_visualized = visualizer_gt.draw_dataset_dict(d)

    # Draw instance predictions
    predictions_visualized = visualizer_pred.draw_instance_predictions(outputs["instances"].to("cpu"))


    # Combine the images horizontally
    final_image = np.concatenate((gt_visualized.get_image()[:, :, ::-1], predictions_visualized.get_image()[:, :, ::-1]), axis=1)
    
    # Assuming you have final_image generated
    final_image_bgr = final_image[:, :, ::-1]  # Convert to BGR format

    if not os.path.exists("/kaggle/working/predictions"):
        os.makedirs("/kaggle/working/predictions")
    
    # Save the image to a file (e.g., final_image.png)
    cv2.imwrite('/kaggle/working/predictions/'+d["image_id"]+'.png', final_image_bgr)

    # Show the combined image using matplotlib
    plt.imshow(final_image)
    plt.axis('off')
    plt.show()

In [None]:
zip_file_name = "predictions.zip"

# Zip the destination directory
shutil.make_archive("/kaggle/working/predictions/", 'zip', "/kaggle/working/predictions/")

# Rename the generated archive to the desired name
os.rename("/kaggle/working/predictions/" + '.zip', zip_file_name)
print(f"Successfully zipped the directory as {zip_file_name}")

# This section is the evaluation of the model performances with coco panoptic

COCO Panoptic Metric Explanation

The COCO Panoptic Metric is a widely used evaluation measure in computer vision, specifically designed to assess the performance of algorithms and models in the context of panoptic segmentation. Panoptic segmentation is a task that unifies the understanding of both stuff (e.g., road, sky) and things (e.g., cars, people) in a scene, making it a comprehensive way to analyze and interpret images and videos.

The COCO Panoptic Metric takes into account the quality of segmentation, ensuring that objects are correctly delineated, and the understanding of the overall scene context, providing a holistic assessment of the model's performance in scene understanding.

The evaluation is typically performed by comparing the model's output to ground truth annotations. The metric is expressed as a combination of the average "thing" and "stuff" segmentation accuracies, along with the "panoptic quality," which summarizes the model's ability to capture the complete scene.

In [None]:
def predict_directory(path):
    n = 0
    for file_name in os.listdir(path):
        if n >= 5:
            break
        # Read the image
        file_path = os.path.join(path, file_name)
        im = cv2.imread(file_path)

        # Measure the prediction time
        start_time = time.time()
        outputs = predictor(im)
        end_time = time.time()
        
        # Calculate the elapsed time
        elapsed_time = end_time - start_time

        print("Inference time: {:.4f} seconds, shape of {}".format(elapsed_time, im.shape))
        
        # Extract the instance masks and their associated classes
        instances = outputs["instances"].to("cpu")
        #instance_masks = instances.pred_masks.numpy()
        print(instances.pred_boxes.tensor.tolist())
        print(instances.pred_classes.tolist())

        # Visualize the predictions
        v = Visualizer(im[:, :, ::-1], metadata=metadata, scale=0.8, instance_mode=ColorMode.IMAGE)
        v = v.draw_instance_predictions(outputs["instances"].to("cpu"))

        # Display the image
        plt.imshow(v.get_image()[:, :, ::-1])
        plt.axis('off')
        plt.show()
        n+=1
        
predict_directory("/kaggle/input/dataset2/dataset2/1418")

In [None]:
!python -m pip install coco-pano-ext-demo

In [None]:
img_size = (400,400)

def create_mask_with_json(data, width, height):
    masks = [np.zeros(img_size, dtype=np.uint8) for x in range(len(category_dict))]
    #mask = np.zeros(img_size, dtype=np.uint8)
    for annotation in data["annotations"]:
        # Convert annotation units to pixel values
        x = int(annotation['bbox'][0])
        y = int(annotation['bbox'][1])
        x2 = int(annotation['bbox'][2])
        y2 = int(annotation['bbox'][3])
        
        # Scale the coordinates to match the mask size
        x_scaled = int(x * img_size[1] / width)      # New x-coordinate after scaling
        y_scaled = int(y * img_size[0] / height)     # New y-coordinate after scaling
        width_scaled = int(x2 * img_size[1] / width)    # New width after scaling
        height_scaled = int(y2 * img_size[0] / height)  # New height after scaling
        

        # Assign the corresponding label value to the pixels within the scaled bounding box
        masks[annotation["category_id"]][y_scaled:y_scaled+height_scaled, x_scaled:x_scaled+width_scaled] = 1
        
    return np.array(masks)

def create_mask_with_prediction(boxes, classes, width, height):
    masks = [np.zeros(img_size, dtype=np.uint8) for x in range(len(category_dict))]
    for i in range(len(boxes)):
        # Convert annotation units to pixel values
        x = int(boxes[i][0])
        y = int(boxes[i][1])
        x2 = int(boxes[i][2])
        y2 = int(boxes[i][3])

        # Scale the coordinates to match the mask size
        x_scaled = int(x * img_size[1] / width)      # New x-coordinate after scaling
        y_scaled = int(y * img_size[0] / height)     # New y-coordinate after scaling
        width_scaled = int(x2 * img_size[1] / width)    # New width after scaling
        height_scaled = int(y2 * img_size[0] / height)  # New height after scaling

        # Assign the corresponding label value to the pixels within the scaled bounding box
        masks[classes[i]][y_scaled:height_scaled, x_scaled:width_scaled] = 1
    
    return np.array(masks)

def predict_image(path):
    # Read the image
    im = cv2.imread(path)

    # Perform prediction
    outputs = predictor(im)

    # Extract the instance masks and their associated classes
    instances = outputs["instances"].to("cpu")

    return (instances.pred_boxes.tensor.tolist(),instances.pred_classes.tolist()) 
        
    
groundtruth = []
predictions = []
for data in dataset_dicts:
    groundtruth.append(create_mask_with_json(data, data["width"], data["height"]))
    prediction = predict_image(data["file_name"])
    predictions.append(create_mask_with_prediction(prediction[0], prediction[1], data["width"], data["height"]))

In [None]:

def show_pred(n, class_predicted=1):
    truth = []
    for annotation in dataset_dicts[n]["annotations"]:
        l = annotation['bbox'].copy()
        l[2] += l[0]
        l[3] += l[1]
        truth.append(l)
    print("groundtruth: ", truth)
    # Read the image
    im = cv2.imread(dataset_dicts[n]["file_name"])

    # Perform prediction
    outputs = predictor(im)

    # Extract the instance masks and their associated classes
    instances = outputs["instances"].to("cpu")
    #instance_masks = instances.pred_masks.numpy()
    print("prediction: ", instances.pred_boxes.tensor.tolist())

    # Visualize the predictions
    v = Visualizer(im[:, :, ::-1], metadata=metadata, scale=0.8, instance_mode=ColorMode.IMAGE)
    v = v.draw_instance_predictions(outputs["instances"].to("cpu"))

    # Display the image
    plt.imshow(v.get_image()[:, :, ::-1])
    plt.axis('off')
    plt.show()
    
    plt.imshow(groundtruth[n][class_predicted], cmap='binary')  # 'binary' colormap for black and white
    plt.title('Binary Map')
    #plt.colorbar()  # Add a colorbar to show the values
    plt.show()

    plt.imshow(predictions[n][class_predicted], cmap='binary')  # 'binary' colormap for black and white
    plt.title('Binary Map')
    #plt.colorbar()  # Add a colorbar to show the values
    plt.show()

show_pred(0, 2)

In [None]:
from coco_pano_ext_demo import COCO_plot, COCO
from coco_pano_ext_demo.coco import _compute_labelmap, _compute_iou
from coco_pano_ext_demo.iou import compute_matching_scores

In [None]:
def compute_matching_weights(target_binary_image, pred_binary_image) -> tuple[list[float], list[float]]:
    # extract connected components
    T = _compute_labelmap(target_binary_image)
    P = _compute_labelmap(pred_binary_image)
    # computes IoUs
    wtp, wpt = 0,0#_compute_iou(T, P)
    if np.all(pred_binary_image==0):
        wtp, wpt = _compute_iou(T, T)
    elif np.all(target_binary_image==0):
        wtp, wpt = _compute_iou(P, P)
    else:
        wtp, wpt = _compute_iou(T, P)
    # remove background components
    wtp, wpt = wtp[1:], wpt[1:]
    return wtp, wpt

In [None]:
from typing import Iterable
import pandas as pd
def compute_pq_score_list_single_class(targets: Iterable[np.ndarray], predictions: Iterable[np.ndarray]) -> tuple[float, float, float, pd.DataFrame]:
    # Init global accumulators
    W_TtoP_global = []
    W_PtoT_global = []
    # loop over predictions (single class, TODO repeat for each class)
    for T0, P0 in zip(targets, predictions):
        # Compute pairwise matching scores, exluding background
        wtp, wpt = compute_matching_weights(T0, P0)
        # Update global matching lists
        W_TtoP_global.extend(wtp.tolist())
        W_PtoT_global.extend(wpt.tolist())
    # report final score
    pairing_threshold = 0.5
    df = compute_matching_scores(np.array(W_TtoP_global), np.array(W_PtoT_global), pairing_threshold)
    COCO_SQ = df["IoU"].mean() if len(df) > 0 else 0
    COCO_RQ = df["F-score"].iloc[0] if len(df) > 0 else 0
    COCO_PQ = COCO_SQ * COCO_RQ
    
    return COCO_PQ, COCO_RQ, COCO_SQ, df

In [None]:
#get the average iou of the predictions
scores_per_class = np.zeros(len(category_dict))

for i in range(len(category_dict)):
    T = np.array(groundtruth)[:,i]
    P = np.array(predictions)[:,i]
    scores_per_class[i], COCO_RQ, COCO_SQ, df = compute_pq_score_list_single_class(T, P)

scores_per_class

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Create a DataFrame
df = pd.DataFrame({'Class': list(category_dict.keys()), 'Precision': scores_per_class})

# Remove rows with NaN precision scores
df = df.dropna(subset=['Precision'])

# Create a histogram
plt.bar(df['Class'], df['Precision'])
plt.xlabel('Class')
plt.ylabel('Precision')
plt.title('Precision per Class')
plt.xticks(rotation=45, ha='right')  # Rotate x-axis labels for better visibility

# Show the plot
plt.tight_layout()
plt.show()