### Summary
- The problem of rebar identification has reported *supervised* solutions, such as the one by *Zheng et al.* in [Appl. Sci. 2023,
13, 8233](https://www.mdpi.com/2076-3417/13/14/8233).
- Here, the Python code for an *unsupervised* approach based on a classical Laplacian-based blob-detector is presented.
  - The detection parameters are first optimized interactively. (The default values are the final 'optimal' values).
  - Further tweaking of parameters and the improvement of the workflow (not done here) are expected to improve the results further (but still underperforming supervised methods).

Install dependencies

In [None]:
%pip install requests opencv-python numpy matplotlib ipywidgets

Import dependencies

In [None]:
import csv
import cv2
import os
import requests

#%matplotlib widget
import matplotlib.pyplot as plt

from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BytesIO
from ipywidgets import interact, IntSlider, widgets
import numpy as np
from requests import Response
from typing import Any, Iterable
from zipfile import ZipFile

Define constants

In [None]:
url = 'https://drive.usercontent.google.com/download?id=1-rRbIP2ds0zSjcI8j8o1ERm3ethAAiZr&export=download&authuser=0&confirm=t&uuid=841f4a8c-ddf3-4111-aaa9-2a145f644347&at=APZUnTW0mPJm6l3LYQfxn-itAWB2:1707407066796'
extract_path = '.'
images_folder = 'images'
annotations_folder = 'annotations'
annotations_file = '100_percent_train.csv'

Define classes (data-models and logic)

In [None]:
dataclass(frozen=True)
class Annotation:
    def __init__(self, filename: str, x1:int, y1:int, x2:int, y2:int):
        self.filename = filename
        self.x1 = int(x1)
        self.y1 = int(y1)
        self.x2 = int(x2)
        self.y2 = int(y2)

    def pt1(self):
        return (self.x1, self.y1)
    
    def pt2(self):
        return (self.x2, self.y2)
    
    @staticmethod
    def parse(row: list[str]):
        filename: str = row[0].strip()
        xy: list[str] = row[1].split(' ')
        return Annotation(
            filename = filename,
            x1=int(xy[0].strip()), 
            y1=int(xy[1].strip()), 
            x2=int(xy[2].strip()), 
            y2=int(xy[3].strip()), 
        )

    @staticmethod
    def from_keypoint(keypoint: cv2.KeyPoint, filename: str = ''):
        half_width: int = keypoint.size / 2
        return Annotation(
            filename=filename,
            x1=keypoint.pt[0] - half_width,
            y1=keypoint.pt[1] - half_width,
            x2=keypoint.pt[0] + half_width,
            y2=keypoint.pt[1] + half_width
        )

    @staticmethod
    def get_image_annotations(annotations: list, filename: str) -> list:
        return list(filter(lambda annotation: annotation.filename == filename, annotations))

    @staticmethod
    def get_indices_of_images_with_annotations(annotations: list, filenames: list[str]) -> list[int]:
        has_annotation = lambda i: len(Annotation.get_image_annotations(annotations, filenames[i])) > 0
        return list(filter(has_annotation, range(0, len(filenames))))

Download images

In [None]:
# expected execution time (if dataset not downloaded yet): < 4 minutes
def download_dataset(url: str, extract_path: str, images_folder: str) -> None:
    if os.path.exists(images_folder):
        return
    response: Response = requests.get(url)
    if response.status_code == 200:
        with ZipFile(BytesIO(response.content), 'r') as zip_ref:
            zip_ref.extractall(extract_path)

download_dataset(url=url, extract_path=extract_path, images_folder=images_folder)

Load Images and file names

In [None]:
# expected execution time: < 2 minutes
filenames: list[str] = os.listdir(images_folder)
images: list[cv2.typing.MatLike] = [cv2.imread(os.path.join(images_folder, filename)) for filename in filenames]

print(f"images: {len(images)}, filenames: {len(filenames)}")
assert len(filenames) == 2125
assert len(images) == len(filenames)

Load annotations

In [None]:
def load_csv(file_path) -> list[Annotation]:
    with open(file_path, 'r') as file:
        return [Annotation.parse(row) for row in csv.reader(file)]

annotations_path: str = os.path.join(annotations_folder, annotations_file)
annotations: list[Annotation] = load_csv(annotations_path)

print(f"images: {len(images)}, annotations: {len(annotations)}")
assert len(images) == 2125
assert len(annotations) == 179713

Identify images without annotation

In [None]:
# expected execution time: 15 seconds
indices_of_images_with_annotations: list[int] = Annotation.get_indices_of_images_with_annotations(annotations, filenames)

print(f"images: {len(images)}, images with annotations: {len(indices_of_images_with_annotations)}")
assert len(images) == 2125
assert len(indices_of_images_with_annotations) == 1125

Browse images and display their *<span style='color:green'>known</span>* bounding-boxes 

In [None]:
def show_image(image_index: int):
    if (image_index < 0 or image_index > len(images)):
        return
    image_annotations: list[Annotation] = Annotation.get_image_annotations(annotations, filenames[image_index])
    img = images[image_index].copy()
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    [cv2.rectangle(img=img, pt1=annotation.pt1(), pt2=annotation.pt2(), color=(0, 255, 0), thickness=4) for annotation in image_annotations]

    plt.imshow(img)
    plt.title(f"file name: {filenames[image_index]}, annotations = {len(image_annotations)}")
    plt.axis('off')
    plt.show()

def show_image_with_widgets():
    image_index_slider = IntSlider(min=0, max=len(images) - 1, step=1, value=0)
    image_index_textbox = widgets.IntText(value=0, description='Image Index:', continuous_update=False)

    image_index_textbox.observe(lambda change: setattr(image_index_slider, 'value', change.new), names='value')
    image_index_slider.observe(lambda change: setattr(image_index_textbox, 'value', change.new), names='value')

    interact(show_image, image_index=image_index_slider)
    display(image_index_textbox)

show_image_with_widgets()

Define detectors

In [None]:
dataclass(frozen=True)
class AnnotationList:
    def __init__(self, items: list[Annotation], filename: str):
        self.items = items
        self.filename = filename

class Detector(ABC):
    # Batch processing
    def detect(self, images: list[cv2.typing.MatLike], filenames: list[str], indices: list[int], params: dict[str, Any]) -> list[AnnotationList]:
        progress_fixed_string: str = ' out of ' + str(len(indices))
        return [AnnotationList(
            self.detect_single_complete(images[j], filenames[j], params | {'filename': filenames[j]}, str(i + 1) + progress_fixed_string),
            filenames[i]
        ) for i, j in enumerate(indices)]

    # Handling the logic for the separation of single-image and batch processing
    def detect_single_complete(self, image: cv2.typing.MatLike, filename: str, params: dict[str, Any], progress: str = None) -> list[Annotation]:
        if (progress != None):
            print('\r' + progress, end='')
        image_params = Detector.image_params(filename, params)
        annotations: list[Annotation] = self.detect_single(image, image_params)
        return [Annotation(filename, ann.x1, ann.y1, ann.x2, ann.y2) for ann in annotations]

    # Single-image processing
    @abstractmethod
    def detect_single(self, image: cv2.typing.MatLike, image_params: dict[str, Any]) -> list[Annotation]:
        pass

    @staticmethod
    def image_params(filename: str, params: dict[str, Any]) -> dict[str, Any]:
        params['rebar_size'] = int(filename[:-6][-2:]) # rebar_119_16MM.jpg -> 16
        return params


class SimpleBlobDetector(Detector):
    def detect_single(self, image: cv2.typing.MatLike, image_params: dict[str, Any]) -> list[Annotation]:
        cv_params = SimpleBlobDetector.get_cv_parameters(image_params)
        cv_detector = cv2.SimpleBlobDetector_create(cv_params)
        keypoints: cv2.KeyPoint = cv_detector.detect(image)
        return list(map(Annotation.from_keypoint, keypoints))

    @staticmethod
    def get_cv_parameters(image_params: dict[str, Any]) -> Any:
        #size: float = float(image_params['rebar_size'])
        #print(image_params)

        cv_params = cv2.SimpleBlobDetector_Params()

        cv_params.minThreshold = float(image_params['min_int'])
        cv_params.maxThreshold = float(image_params['max_int'])

        cv_params.filterByArea = True
        cv_params.minArea = float(image_params['min_size'] ** 2)
        cv_params.maxArea = float(image_params['max_size'] ** 2)

        cv_params.filterByCircularity = True
        cv_params.minCircularity = 0.001
        cv_params.maxCircularity = 1.00

        cv_params.filterByConvexity = True
        cv_params.minConvexity = 0.001
        cv_params.maxConvexity = 1.00

        cv_params.filterByInertia = True
        cv_params.minInertiaRatio = 0.001
        cv_params.maxInertiaRatio = 1.00

        return cv_params        

simple_blob_detector: Detector = SimpleBlobDetector()

Browse images and display their *<span style='color:yellow'>estimated</span>* bounding-boxes (try & error)

In [None]:
# For try and error and parameter optimization
def show_image(
        image_index: int,
        min_size: int,
        max_size: int,
        min_int: int,
        max_int: int,
        connected_component_threshold: int,
        connected_component_connectivity: int,
        gaussian_size: int,
        median_size: int,
        min_canny: int,
        max_canny: int,
        laplaceian_ksize: int,
        bilateral_d: int,
        bilateral_sigma: int,
    ):
    if (image_index < 0 or image_index > len(images)):
        return
    if (image_index not in indices_of_images_with_annotations):
        print(image_index)
        print(filenames[image_index])
        img = np.zeros(images[image_index].shape)
        plt.imshow(img)
        return

    params: dict[str, int] = {
        'min_size': min_size,
        'max_size': max_size,
        'min_int': min_int,
        'max_int': max_int
    }
    #image_annotations: list[Annotation] = Annotation.get_image_annotations(annotations, filenames[image_index])
    #img = images[image_index].copy()
    #img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    #[cv2.rectangle(img=img, pt1=annotation.pt1(), pt2=annotation.pt2(), color=(0, 255, 0), thickness=4) for annotation in image_annotations]
    
    img = images[image_index].copy()
    #img = np.float32(img)
    #img = cv2.bilateralFilter(img, bilateral_d, sigmaColor=bilateral_sigma, sigmaSpace=bilateral_sigma)
    img = cv2.GaussianBlur(img, (gaussian_size, gaussian_size), sigmaX=0, sigmaY=0)
    img = cv2.medianBlur(img, ksize=median_size)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = cv2.GaussianBlur(img, (gaussian_size, gaussian_size), sigmaX=0, sigmaY=0)
    img = cv2.medianBlur(img, ksize=median_size)
    img = cv2.Laplacian(img, ddepth=cv2.CV_16S, ksize=laplaceian_ksize)
    img = cv2.convertScaleAbs(img)
    img = cv2.medianBlur(img, ksize=median_size)
    img = cv2.convertScaleAbs(img)
    #img = cv2.Canny(img, threshold1=min_canny, threshold2=max_canny)
    #img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]
    #result = cv2.connectedComponentsWithStats(img, connected_component_connectivity, cv2.CV_32S)
    #(num_labels, labels, stats, centroids) = result
    #print(num_labels)
    #print(labels)
    #print(stats)
    #print(centroids)
    #img = abs(255 - img) 
    #img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    #img = cv2.GaussianBlur(img, (max_size, max_size), sigmaX=0, sigmaY=0)
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    detected_annotations: list[Annotation] = simple_blob_detector.detect_single_complete(img, filenames[image_index], params)
    [cv2.rectangle(img=img, pt1=annotation.pt1(), pt2=annotation.pt2(), color=(0, 255, 255), thickness=10) for annotation in detected_annotations]

    #img_gray = cv2.cvtColor(images[image_index], cv2.COLOR_BGR2GRAY)
    #detected_annotations: list[Annotation] = detectors[0].detect_single_complete(img_gray, filenames[image_index], params)
    #[cv2.rectangle(img=img_gray, pt1=annotation.pt1(), pt2=annotation.pt2(), color=(255, 255, 255), thickness=10) for annotation in detected_annotations]


    #plt.imshow(img, cmap='gray')
    plt.imshow(img)
    #plt.title(f"file name: {filenames[image_index]}, annotations = {len(detected_annotations)}")
    plt.axis('off')
    plt.show()

def show_image_with_widgets():
    bilateral_d_slider = IntSlider(min=1, max=200, step=1, value=1)
    bilateral_sigma_slider = IntSlider(min=1, max=200, step=1, value=5)
    connected_component_threshold_slider = IntSlider(min=1, max=255, step=1, value=5)
    connected_component_connectivity_slider = IntSlider(min=4, max=8, step=4, value=4)
    gaussian_size_slider = IntSlider(min=1, max=19, step=2, value=17)
    median_size_slider = IntSlider(min=1, max=19, step=2, value=19)
    min_canny_slider = IntSlider(min=1, max=13, step=2, value=7)
    max_canny_slider = IntSlider(min=1, max=255, step=1, value=200)
    laplaceian_ksize_slider = IntSlider(min=1, max=15, step=1, value=7)
    min_size_slider = IntSlider(min=1, max=300, step=1, value=72)
    max_size_slider = IntSlider(min=1, max=300, step=1, value=100)
    min_int_slider = IntSlider(min=1, max=100, step=1, value=1)
    max_int_slider = IntSlider(min=1, max=255, step=1, value=189)

    image_index_slider = IntSlider(min=0, max=len(images) - 1, step=1, value=0)
    image_index_textbox = widgets.IntText(value=0, description='Image Index:', continuous_update=False)

    image_index_textbox.observe(lambda change: setattr(image_index_slider, 'value', change.new), names='value')
    image_index_slider.observe(lambda change: setattr(image_index_textbox, 'value', change.new), names='value')

    interact(
        show_image,
        image_index=image_index_slider,
        min_size=min_size_slider,
        max_size=max_size_slider,
        min_int=min_int_slider,
        max_int=max_int_slider,
        connected_component_threshold=connected_component_threshold_slider,
        connected_component_connectivity=connected_component_connectivity_slider,
        gaussian_size=gaussian_size_slider,
        median_size=median_size_slider,
        min_canny=min_canny_slider,
        max_canny=max_canny_slider,
        laplaceian_ksize = laplaceian_ksize_slider,
        bilateral_d=bilateral_d_slider,
        bilateral_sigma=bilateral_sigma_slider,
    )
    display(image_index_textbox)

show_image_with_widgets() 

Define optimized (Laplace) detctor

In [None]:
class LaplaceDetector(Detector):
    def __init__(self, simple_blob_detector):
        self.simple_blob_detector = simple_blob_detector

    def detect_single(self, image: cv2.typing.MatLike, image_params: dict[str, Any]) -> list[Annotation]:
        filename: str = image_params['filename']

        simple_blob_detector_params: dict[str, Any] = image_params['simple_blob_detector_params']

        gaussian_size: int = image_params['gaussian_size']
        median_size: int = image_params['median_size']
        laplaceian_ksize: int = image_params['laplaceian_ksize']

        img = image.copy()
        img = cv2.GaussianBlur(img, (gaussian_size, gaussian_size), sigmaX=0, sigmaY=0)
        img = cv2.medianBlur(img, ksize=median_size)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = cv2.GaussianBlur(img, (gaussian_size, gaussian_size), sigmaX=0, sigmaY=0)
        img = cv2.medianBlur(img, ksize=median_size)
        img = cv2.Laplacian(img, ddepth=cv2.CV_16S, ksize=laplaceian_ksize)
        img = cv2.convertScaleAbs(img)
        img = cv2.medianBlur(img, ksize=median_size)
        img = cv2.convertScaleAbs(img)
        #img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
        detected_annotations: list[Annotation] = self.simple_blob_detector.detect_single_complete(img, filename, simple_blob_detector_params)
        
        return detected_annotations
    
laplace_detectror: Detector = LaplaceDetector(simple_blob_detector)

Compare detected annotations with known ones

In [None]:
#expected execution time: 12 minutes
class Validator:
    @staticmethod
    def validate(
        images: list[cv2.typing.MatLike],
        filenames: list[str],
        expected_annotations: list[AnnotationList],
        indices_of_images_with_annotations: list[int],
        params: dict[str, Any]
    ) -> dict[str, float | int]:
        print('processing ' + str(len(indices_of_images_with_annotations)) + ' images started.')
        calculated_annotations: list[AnnotationList] = laplace_detectror.detect(images, filenames, indices_of_images_with_annotations, params)
        print('')
        print('processing ' + str(len(indices_of_images_with_annotations)) + ' images ended.')
        calculated_annotations_count = len(calculated_annotations)
        error_mean: float = Validator.annotation_count_error(calculated_annotations, expected_annotations)
        return {'error_mean': error_mean, 'calculated_annotations_count': calculated_annotations_count}

    @staticmethod
    def annotation_count_error(calculated: list[AnnotationList], expected: list[AnnotationList]) -> float:
        assert(len(calculated) == len(expected))
        error_list: Iterable[float] = map(lambda i : abs( 1 - len(calculated[i].items) / len(expected[i].items)), range(len(expected)))
        error_mean: float = np.array(list(error_list)).mean()
        return error_mean
    
class App:
    @staticmethod
    def run():
        simple_blob_detector_params: dict[str, Any] = {
            'min_size': 72,
            'max_size': 100,
            'min_int': 1,
            'max_int': 189
        }
        laplace_detectror_params: dict[str, Any] = {
            'simple_blob_detector_params': simple_blob_detector_params,
            'gaussian_size': 17,
            'median_size': 19,
            'laplaceian_ksize': 7,
        }
        expected_annotations: list[AnnotationList] = [
            AnnotationList(Annotation.get_image_annotations(annotations, filenames[i]), filenames[i])
            for i in indices_of_images_with_annotations
        ]
        assert (len(expected_annotations) == 1125)

        results: dict[str, float | int] = Validator.validate(images, filenames, expected_annotations, indices_of_images_with_annotations, laplace_detectror_params)
        print(results)
        assert(results['error_mean'] < 0.65) # 0.6406604616379477


App.run()