This notebook sets up a fasterRCNN model to detect signatures. 
It's all written in torch, including the dataset.
It uses a custom anchor generator to focus on horizontal bounding boxes. 
All documents are transformed with a canny edge detector, and resized to 400 x 400.
The dataset has some errors, which are corrected

In [None]:
import kagglehub
import shutil
import os
import pandas as pd
import torch
%matplotlib inline

reproducibility = True # don't random shuffle the dataset to keep a consistent division between test and training dataset

In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def download_dataset():
    # path of the dataset (this notebook was made for kaggle)
    download_path = "../input/signverod"
    # Copy to target path
    dataPath = "data/"
    print("Path = ", os.path.realpath("data"))
    print(download_path)
    if not os.path.exists(dataPath):
        shutil.copytree(download_path, dataPath)

    print(f"Dataset files moved to: {dataPath}")

    # Fix and merge dataset. See: https://www.kaggle.com/code/alexhorduz/fixing-signverod-dataset
    trainDF = pd.read_csv(dataPath + 'train.csv')
    testDF = pd.read_csv(dataPath + 'test.csv')
    mapping = pd.read_csv(dataPath + 'image_ids.csv')
    trainDF.loc[trainDF.index > 4309, 'image_id'] += 2133
    trainDF.loc[trainDF.index > 4309, 'id'] += 4737
    trainDF.iloc[4307:4316]
    testDF.loc[testDF.index > 809, 'image_id'] += 2133
    testDF.loc[testDF.index > 809, 'id'] += 4737
    testDF.iloc[806:820]
    mapping.loc[mapping.index > 2132, 'id'] += 2133
    mapping.iloc[2130:2140]
    testIDS = set(testDF['id'])
    trainIDS = set(trainDF['id'])
    duplicated = testIDS.intersection(trainIDS)
    trainDF.loc[trainDF['id'] == 26, :]
    testDF.loc[testDF['id'] == 26, :]
    data = pd.concat([trainDF, testDF]).drop_duplicates().sort_values(['id'])

    # Save the fixed version of the dataset
    save_path = "data/raw/fixed_dataset/"
    os.makedirs(save_path, exist_ok=True)
    data.to_csv(save_path + "full_data.csv", index=False)
    mapping.to_csv(save_path + "updated_image_ids.csv", index=False)


download_dataset()

Path =  /kaggle/working/data
../input/signverod
Dataset files moved to: data/


In [None]:
!mkdir data/resized #folder for canny - resized images

categories.csv	images	      raw      test.csv   train.csv
image_ids.csv	labelmap.txt  resized  tfrecords


In [7]:
import numpy as np
import os
from tqdm import tqdm
from PIL import Image
import cv2
canny = True

for filename in tqdm(os.listdir("data/images")):
    if os.path.exists(os.path.join("data/resized/", filename)):
        continue
    img = Image.open(os.path.join("data/images", filename)).convert('L')

    if canny:
        image_np = np.array(img)
        blurred = cv2.GaussianBlur(image_np, (5, 5), 1.4)
        edges = cv2.Canny(blurred, threshold1=100, threshold2=200)
        img = Image.fromarray(edges)

    img.resize((400, 400)).save(os.path.join("data/resized", filename))


100%|██████████| 2765/2765 [08:17<00:00,  5.56it/s]


In [None]:
import pandas as pd
import torch
from torchvision.transforms import functional as F
from PIL import Image
import os
import ast
from tqdm import tqdm
import matplotlib.pyplot as plt

class SignDataset(torch.utils.data.Dataset):
    def __init__(self, transforms=None):
        malformed = 0
        dataset = {}
        image_dir = "data/resized/"
        image_info_path = "data/raw/fixed_dataset/updated_image_ids.csv"
        annotations_path = "data/raw/fixed_dataset/full_data.csv"
        image_info = pd.read_csv(image_info_path)
        annotations = pd.read_csv(annotations_path)
        signature_annotations = annotations[annotations['category_id'] == 1]
        for _, row in tqdm(signature_annotations.iterrows(), total=len(signature_annotations)):
            bbox = ast.literal_eval(row['bbox'])
            image_id = row['image_id']
            image_info_row = image_info[image_info['id'] == image_id].iloc[0]
                
            file_name = image_dir+image_info_row['file_name']
            img_height = 400
            img_width = 400
            if image_id not in dataset:
                dataset[image_id] = {"name":file_name, "boxes":[]}
            # Calculate bounding box in pixel coordinates
            x_min = round(bbox[0] * img_width)
            y_min = round(bbox[1] * img_height)
            x_max = round((bbox[0] + bbox[2]) * img_width)
            y_max = round((bbox[1] + bbox[3]) * img_height)

            # Sanity check on bounding boxes
            if not bbox[0] + bbox[2] < 1 or not bbox[1] + bbox[3] < 1:
                malformed += 1
                dataset.pop(image_id)
                continue
            assert (type(x_min) == type(x_max) == type(y_min) == type(y_max) == int)

            if x_min < x_max - 1 or y_min < y_max - 1:
                dataset[image_id]["boxes"].append((x_min, y_min, x_max, y_max))
        self.dataset = [y for x, y in sorted(dataset.items()) if len(y["boxes"]) != 0]
        self.dataset = self.dataset[1602:] + self.dataset[:1598]

        # Other sanity check
        for x in self.dataset:
            for bb in x["boxes"]:
                for coord in bb:
                    assert 0 <= coord <= 400
        self.transforms = transforms
        print(f"malformed bounding boxes : {malformed} ({malformed/(len(self.dataset) + malformed) * 100}%)")

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        """Returns a tuple (Image, target).
        Image is the image as a Tensor, target is a dictionary containing 
        the labels and the bounding boxes
        """
        img_path = self.dataset[idx]['name']
        boxes = self.dataset[idx]['boxes'].copy()

        image = Image.open(img_path).convert("RGB")
        boxes_t = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.ones(len(boxes), dtype=torch.int64)

        target = {"boxes": boxes_t, "labels": labels}
        if self.transforms:
            image = self.transforms(image)

        return F.to_tensor(image), target.copy()


In [None]:
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torch.utils.data import Subset


if reproducibility:
    dataset = SignDataset()
    train_len = int(0.9 * len(dataset))
    test_len = len(dataset) - train_len
    train_dataset = Subset(dataset, range(train_len))
    test_dataset = Subset(dataset, range(train_len, len(dataset)))
else:
    train_dataset, test_dataset = torch.utils.data.random_split(SignDataset(), (0.95, 0.05))

def train():
    aspect_ratios = ((1, 0.75, 0.5),) * 5  # Stessi rapporti d'aspetto per tutte le feature maps
    anchor_generator = AnchorGenerator(
        sizes=((16, 32),   # P2: Piccoli oggetti
               (32, 64),   # P3
               (64, 128),  # P4
               (128, 256), # P5
               (256, 350)), # P6: Grandi oggetti, vicino al limite dell'immagine
        aspect_ratios=aspect_ratios
    )
    model = fasterrcnn_resnet50_fpn(rpn_anchor_generator=anchor_generator)
    num_classes = 2
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    train_loader = torch.utils.data.DataLoader(
            train_dataset,
            batch_size=4,
            shuffle=True,
            collate_fn=lambda x: tuple(zip(*x)))

    print(f"Using device: {device}")
    model = model.to(device)
    model.train()

    for epoch in range(5):
        steps = 8 # batch size is 4 * 8 = 32, but this is divided in 16 small batches, to avoid using too much gpu memory
        running_loss = 0.0
        total = 0
        i = 0
        for images, targets in tqdm(train_loader, total=len(train_loader)):
            images = list(image.to(device) for image in images)
            total += len(images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            running_loss += losses.item()
            if total%(4*20) == 0:
                print(running_loss/total)
            
            losses.backward()
            steps -= 1
            # if the batch is full or it's the end of the epoch, optimize
            if steps == 0 or (i + 1) == len(train_loader):
                steps = 8
                optimizer.step()
                optimizer.zero_grad()
            i += 1
        print(f"Epoch {epoch}, Loss: {running_loss/total}")    
    return model

model = train()

In [None]:
!rm -rf model
!mkdir model
!ls model
PATH = "model/saved"
torch.save(model, PATH)
the_model = torch.load(PATH, weights_only=False)

In [None]:
# setup train and test dataset, but don't train the model. Load it from the file instead
from torch.utils.data import Subset
dataset = SignDataset()
train_len = int(0.9 * len(dataset))
test_len = len(dataset) - train_len
train_dataset = Subset(dataset, range(train_len))
test_dataset = Subset(dataset, range(train_len, len(dataset)))
the_model = torch.load("model/saved", weights_only=False)

In [None]:
# test phase
test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=4,
        shuffle=True,
        collate_fn=lambda x: tuple(zip(*x)))

model.eval()
with torch.no_grad():
    total = 0
    running_loss = 0.0
    for images, targets in tqdm(test_loader, total=len(test_loader)):
        images = list(image.to(device) for image in images)
        total += len(images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        print("losses : ", loss_dict[0])
        losses = sum(loss for loss in loss_dict.values())
        running_loss += losses.item()
        if total%(4*20) == 0:
            print(running_loss/total)
print(f"Train loss: {losses.item()}")


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:

# steps = 8 
# running_loss = 0.0
# total = 0
# i = 0
# train_loader = torch.utils.data.DataLoader(
#             train_dataset,
#             batch_size=4,
#             shuffle=True,
#             collate_fn=lambda x: tuple(zip(*x)))

# for images, targets in tqdm(train_loader, total=len(train_loader)):
#     images = list(image.to(device) for image in images)
#     total += len(images)
#     # print("targets ",targets)
#     targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
#     # print('\n'.join(map(str, [x["boxes"] for x in targets])))
    
#     loss_dict = model(images, targets)
#     losses = sum(loss for loss in loss_dict.values())
#     running_loss += losses.item()
#     if total%(4*20) == 0:
#         print(running_loss/total)
    
#     losses.backward()
#     steps -= 1
#     if steps == 0 or (i + 1) == len(train_loader):
#         steps = 8
#         optimizer.step()
#         optimizer.zero_grad()
#     i += 1    
# # print(f"Epoch {epoch}, Loss: {running_loss/total}")    


In [None]:
def rect_inter(a, b):
    inter =  (max(a[0], b[0]), max(a[1], b[1]), min(a[2], b[2]), min(a[3], b[3]))
    if inter[0] > inter[2] or inter[1] > inter[3]:
        return ()
    return inter

def area(a):
    if len(a) == 4:
        return (a[2] - a[0]) * (a[3] - a[1])
    return 0
# Filters the result based on a minimum score.
# Also avoids overlap greater than overlap_thres. 
# overlap_thres is the overlap coefficient between the two boxes
# i.e. area(intersection(a, b)) / min(area(a), area(b))
def filter_result(result, thresh=0.75, overlap_thres = 0.5):
    boxes = result["boxes"]
    scores = result["scores"]
    
    chosen_boxes = []
    chosen_scores = []
    result = sorted(zip(boxes, scores), key=lambda x:-x[1])
    for box, score in result:
        # box = box.to(torch.device("cpu"))
        if score < thresh:
            break
        good = True
        for other in chosen_boxes:
            overlap = area(rect_inter(box, other)) / min(area(box), area(other))
            if overlap > overlap_thres:
                good = False
                break
        if good:
            chosen_boxes.append(box)
            chosen_scores.append(score)
    return chosen_boxes, chosen_scores



Visualizza in blu le box attese, in rosso quelle trovate

In [None]:
from PIL import ImageDraw, Image
import numpy as np
import random
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

the_model.eval()

i = random.randint(1, len(test_dataset))
print(i)
nparr = test_dataset[i][0].permute(1, 2, 0).numpy()
img = Image.fromarray(np.uint8(nparr*255))

predict = the_model([test_dataset[i][0].to(device)])[0]


img1 = ImageDraw.Draw(img, "RGBA")
boxes, scores = filter_result(predict, thresh=0.5)
for box, score in zip(boxes, scores):
    
    xa, ya, xb, yb = map(int, box)
    img1.rectangle((xa, ya, xb, yb), outline=(255, 0, 0, int(90*score)))

for box in test_dataset[i][1]["boxes"]:
    xa, ya, xb, yb = map(int, box)
    img1.rectangle((xa, ya, xb, yb), outline="blue")


fig, ax = plt.subplots(nrows = 1, ncols=1, figsize=(13, 13))
ax.imshow(img)

print(test_dataset[i][1:])