## Verificator

PoC for tattoo verificator.

---

### Load dependencies

In [2]:
import time
start_time = time.time()

from pathlib import Path
import re
import os
import csv
import hashlib
from PIL import Image
import cv2
import shutil
import numpy as np
import pandas as pd
import glob

from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import cosine_similarity
import torch
import torch.nn as nn
from torchvision import models, transforms
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights

## Global variables and common methods

In [3]:
base_path = Path("../datasets/tattoos")
bound_box_path = Path(f"{base_path}/bounding_boxes")
images_path = Path(f"{base_path}/images")

image_index_csv = f"{base_path}/image_index.csv"
duplicates_file = f"{base_path}/duplicates.txt"
corrupt_files_file = f"{base_path}/corrupt_files.txt"

resized_images_folder = f"{base_path}/resized_color"
processed_images_folder = f"{base_path}/processed_images_grayscale"
image_index_with_labels_csv = f"{base_path}/image_index_with_labels.csv"

dataset_split_folder = f"{base_path}/dataset_split"
train_csv = f"{dataset_split_folder}/train.csv"
val_csv = f"{dataset_split_folder}/val.csv"
test_csv = f"{dataset_split_folder}/test.csv"

def remove_single_file(file_path):
    try:
        os.remove(file_path)
    except OSError:
        pass

def remove_folder(folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    else:
        shutil.rmtree(folder_path)
        os.makedirs(folder_path)

## Data preparation

### Load dataset

In [4]:
pattern = r'^\d+_\d+\.JPG'
total_bound_boxes = [file.name for file in bound_box_path.iterdir() if file.is_file()]
all_images = [file.name for file in images_path.iterdir() if file.is_file()]
base_images = [file.name for file in images_path.iterdir() if file.is_file() and re.match(pattern, file.name)]

print ("Base images in data folder: ")
print("     Total of bounding boxes: ", len(total_bound_boxes))
print("     Total of images: ", len(all_images))
print("     Total of base images: ", len(base_images))
print('')
print("Base images and their variants")

base_image_variant_counts = {base_image: 0 for base_image in base_images}

for image in all_images:
    for base_image in base_images:
        if image.startswith(base_image[:-4]):
            base_image_variant_counts[base_image] += 1

for base_image, count in base_image_variant_counts.items():
    print(f"    Base image '{base_image}' has {count} variants.")

Base images in data folder: 
     Total of bounding boxes:  4411
     Total of images:  4411
     Total of base images:  161

Base images and their variants
    Base image '118_1.JPG' has 21 variants.
    Base image '103_2.JPG' has 21 variants.
    Base image '77_1.JPG' has 21 variants.
    Base image '32_1.JPG' has 21 variants.
    Base image '14_2.JPG' has 21 variants.
    Base image '53_1.JPG' has 21 variants.
    Base image '16_1.JPG' has 21 variants.
    Base image '144_1.JPG' has 21 variants.
    Base image '93_1.JPG' has 21 variants.
    Base image '91_1.JPG' has 21 variants.
    Base image '146_1.JPG' has 21 variants.
    Base image '103_1.JPG' has 21 variants.
    Base image '51_1.JPG' has 21 variants.
    Base image '14_1.JPG' has 21 variants.
    Base image '88_1.JPG' has 21 variants.
    Base image '75_1.JPG' has 21 variants.
    Base image '16_2.JPG' has 21 variants.
    Base image '127_1.JPG' has 21 variants.
    Base image '48_1.JPG' has 21 variants.
    Base image '10_4

### CSV Index

file_path | label | transformation

In [5]:
remove_single_file(image_index_csv)

with open(image_index_csv, mode="w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(["file_path", "label", "transformation"])
    
    for filename in os.listdir(images_path):
        if filename.endswith(('.jpg', '.jpeg', '.png', 'JPG', '.PNG', '.JPEG')):
            parts = filename.split("_")
            label = parts[0]
            transformation = parts[-1].split(".")[0] if len(parts) > 2 else "original"
            file_path = os.path.join(images_path, filename)
            writer.writerow([file_path, label, transformation])

print(f"CSV Index file created at: {image_index_csv}")

CSV Index file created at: ../datasets/tattoos/image_index.csv


### Check

Check for corrupted images or duplicates

In [6]:
remove_single_file(duplicates_file)
remove_single_file(corrupt_files_file)

# Detect duplicates
hashes = {}
duplicates = []

# Corrupted files
corrupt_files = []

for filename in os.listdir(images_path):
    file_path = os.path.join(images_path, filename)
    try:
        # Check for corrupted data
        with Image.open(file_path) as img:
            img.verify()
        
        # Check hash
        with open(file_path, "rb") as f:
            file_hash = hashlib.md5(f.read()).hexdigest()
        
        # Check for duplicates
        if file_hash in hashes:
            duplicates.append((file_path, hashes[file_hash]))
        else:
            hashes[file_hash] = file_path
    except Exception as e:
        # Save corrupted file
        corrupt_files.append(file_path)

# Save duplicates
with open(duplicates_file, "w") as f:
    for dup, original in duplicates:
        f.write(f"{dup} is duplicate of {original}\n")

# Save corrupted
with open(corrupt_files_file, "w") as f:
    for corrupt in corrupt_files:
        f.write(f"{corrupt} is corrupted\n")

print(f"Duplicates found: {len(duplicates)} (details on {duplicates_file})")
print(f"Corrupted files found: {len(corrupt_files)} (details on {corrupt_files_file})")

Duplicates found: 5 (details on ../datasets/tattoos/duplicates.txt)
Corrupted files found: 0 (details on ../datasets/tattoos/corrupt_files.txt)


### Preprocessing

#### Resize

In [7]:
remove_folder(resized_images_folder)

for filename in os.listdir(images_path):
    img = cv2.imread(os.path.join(images_path, filename))
    resized_img = cv2.resize(img, (224, 224))
    cv2.imwrite(os.path.join(resized_images_folder, filename), resized_img)

print(f"Resized images found at: {resized_images_folder}")

Resized images found at: ../datasets/tattoos/resized_color


#### Normalization and consistency

In [8]:
remove_folder(processed_images_folder)

# Processing parameters
target_size = (224, 224)  # Fixed size
normalize_range = (0, 1)  # Range [0, 1] o [-1, 1]
color_mode = "grayscale"  # Options: "rgb" o "grayscale"

def normalize_image(image, range_min, range_max):
    image = image.astype("float32")  # Convertir a flotante
    if range_min == 0 and range_max == 1:
        return image / 255.0  # Normalizar a [0, 1]
    elif range_min == -1 and range_max == 1:
        return (image / 127.5) - 1.0  # Normalizar a [-1, 1]
    else:
        raise ValueError("Invalid normalization range.")

# Image processing
for filename in os.listdir(images_path):
    input_path = os.path.join(images_path, filename)
    output_path = os.path.join(processed_images_folder, filename)

    # Read image
    image = cv2.imread(input_path)
    if image is None:
        print(f"Warning: could not load image {input_path}. Skipped.")
        continue

    # Format conversion
    if color_mode == "rgb":
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    elif color_mode == "grayscale":
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        raise ValueError("Invalid color mode. Choose either 'rgb' or 'grayscale'.")

    # Resize
    image = cv2.resize(image, target_size)

    # Pixel normalization
    normalized_image = normalize_image(image, *normalize_range)

    # Save processed images
    if color_mode == "grayscale":
        cv2.imwrite(output_path, (normalized_image * 255).astype("uint8"))
    else:
        cv2.imwrite(output_path, (normalized_image * 255).astype("uint8")[:, :, ::-1])  # Convert RGB to BGR to save

print(f"Processed images found at: {processed_images_folder}")

Processed images found at: ../datasets/tattoos/processed_images_grayscale


#### Additional labeling

In [9]:
remove_single_file(image_index_with_labels_csv)

difficulty_mapping = {
    "original": "easy",
    "a1": "medium", "a2": "medium", "a3": "medium", "a4": "medium",
    "ar1": "medium", "ar2": "medium", "ar3": "medium", "ar4": "medium",
    "b1": "hard", "b2": "hard",
    "c1": "medium", "c2": "medium", "c3": "medium", "c4": "medium",
    "i1": "medium", "i2": "medium",
    "r1": "hard", "r2": "hard", "r3": "hard", "r4": "hard"
}

image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.JPG', '*.PNG', '*.JPEG']

original_image_paths = []
for ext in image_extensions:
    original_image_paths.extend(glob.glob(os.path.join(processed_images_folder, ext)))

csv_data = []

def extract_info_from_filename(filename):
    basename = os.path.basename(filename)
    parts = basename.split("_")
    
    label = parts[0]
    number = parts[1]
    transformation = parts[2].replace(".jpg", "").replace(".jpeg", "").replace(".png", "").replace(".JPG", "").replace(".PNG", "").replace(".JPEG", "") if len(parts) > 2 else "original"    
    return label, number, transformation

for image_path in original_image_paths:
    label, number, transformation = extract_info_from_filename(image_path)
    
    difficulty = difficulty_mapping.get(transformation, "unknown")
    
    if transformation == "original":
        group = "original"
    else:
        group = "transformed"
     
    csv_data.append({
        "file_path": image_path,
        "label": label,
        "transformation": transformation,
        "difficulty": difficulty,
        "group": group
    })

df = pd.DataFrame(csv_data)
df.to_csv(image_index_with_labels_csv, index=False)

print(f"CSV with additional labels at: {image_index_with_labels_csv}")

CSV with additional labels at: ../datasets/tattoos/image_index_with_labels.csv


#### Training dataset

In [10]:
remove_folder(dataset_split_folder)

train_size = 0.7
val_size = 0.15
test_size = 0.15

data = pd.read_csv(image_index_with_labels_csv)
data = data[data["label"] != "temp"]
# print(data["label"].value_counts())

train_data, temp_data = train_test_split(data, test_size=(1 - train_size), stratify=data["label"], random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=(test_size / (val_size + test_size)), stratify=temp_data["label"], random_state=42)

train_data.to_csv(train_csv, index=False)
val_data.to_csv(val_csv, index=False)
test_data.to_csv(test_csv, index=False)

print(f"Split dataset:")
print(f"    - Train:    {len(train_data)} images (found at {train_csv})")
print(f"    - Validate: {len(val_data)} images (found at {val_csv})")
print(f"    - Test:     {len(test_data)} images (found at {test_csv})")

Split dataset:
    - Train:    3086 images (found at ../datasets/tattoos/dataset_split/train.csv)
    - Validate: 662 images (found at ../datasets/tattoos/dataset_split/val.csv)
    - Test:     662 images (found at ../datasets/tattoos/dataset_split/test.csv)


## Model training

### Load ResNet model

Configure ResNet for characteristic extraction

In [11]:
# Configurar dispositivo (GPU, MPS o CPU)
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")

# Cargar modelo ResNet preentrenado
model = efficientnet_b0(weights=EfficientNet_B0_Weights.IMAGENET1K_V1)
# Quitar la última capa para obtener vectores de características
model = nn.Sequential(*list(model.children())[:-1])
model = model.to(device)
model.eval()  # Configurar en modo evaluación

Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /Users/administrator/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth
100%|██████████| 20.5M/20.5M [00:03<00:00, 6.61MB/s]


Sequential(
  (0): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2): Conv2dNormActivation(
    

### Apply transformations

In [12]:
# Transformaciones necesarias para ResNet
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Tamaño esperado por ResNet
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalización estándar
])

# Función para procesar una imagen y obtener su vector de características
def extract_features(image_path, model, device):
    image = Image.open(image_path).convert("RGB")  # Asegurarse de que sea RGB
    input_tensor = transform(image).unsqueeze(0).to(device)  # Transformar y agregar dimensión batch
    with torch.no_grad():
        features = model(input_tensor).squeeze()  # Pasar por el modelo y aplanar
    return features.cpu().numpy()  # Convertir a numpy array

### Matching images

In [13]:
# Ruta de la imagen a comparar
query_image_path = f"{images_path}/1_1_ar1.JPG"

# Extraer características de la imagen de consulta
query_features = extract_features(query_image_path, model, device)

#### Extract and match against original dataset

In [14]:
# Extraer características de todas las imágenes del dataset
original_dataset_features = []
original_image_paths = []
for file_name in os.listdir(images_path):
    if file_name.lower().endswith(('.jpg', '.jpeg', '.png')):
        image_path = os.path.join(images_path, file_name)
        features = extract_features(image_path, model, device)
        original_dataset_features.append(features)
        original_image_paths.append(image_path)

# Convertir a numpy array para facilitar cálculos
original_dataset_features = np.array(original_dataset_features)

# Calcular similitudes (usando distancia coseno)
similarities = cosine_similarity(query_features.reshape(1, -1), original_dataset_features)[0]

# Ordenar resultados por similitud
sorted_indices = np.argsort(similarities)[::-1]  # De mayor a menor
top_k = 5  # Número de resultados más similares a mostrar

print(f"Imagen de consulta: {query_image_path}")
print("Imágenes más similares:")
for idx in sorted_indices[:top_k]:
    print(f"Imagen: {original_image_paths[idx]}, Similitud: {similarities[idx]:.4f}")

Imagen de consulta: ../datasets/tattoos/images/1_1_ar1.JPG
Imágenes más similares:
Imagen: ../datasets/tattoos/images/1_1_ar1.JPG, Similitud: 1.0000
Imagen: ../datasets/tattoos/images/1_1_ar4.JPG, Similitud: 0.9997
Imagen: ../datasets/tattoos/images/1_1_ar3.JPG, Similitud: 0.9997
Imagen: ../datasets/tattoos/images/1_1.JPG, Similitud: 0.9994
Imagen: ../datasets/tattoos/images/1_1_ar2.JPG, Similitud: 0.9988


#### Extract and match against resized dataset

In [15]:
# Extraer características de todas las imágenes del dataset
resized_dataset_features = []
resized_image_paths = []
for file_name in os.listdir(resized_images_folder):
    if file_name.lower().endswith(('.jpg', '.jpeg', '.png')):
        image_path = os.path.join(images_path, file_name)
        features = extract_features(image_path, model, device)
        resized_dataset_features.append(features)
        resized_image_paths.append(image_path)

# Convertir a numpy array para facilitar cálculos
resized_dataset_features = np.array(resized_dataset_features)

# Calcular similitudes (usando distancia coseno)
similarities = cosine_similarity(query_features.reshape(1, -1), resized_dataset_features)[0]

# Ordenar resultados por similitud
sorted_indices = np.argsort(similarities)[::-1]  # De mayor a menor
top_k = 5  # Número de resultados más similares a mostrar

print(f"Imagen de consulta: {query_image_path}")
print("Imágenes más similares:")
for idx in sorted_indices[:top_k]:
    print(f"Imagen: {resized_image_paths[idx]}, Similitud: {similarities[idx]:.4f}")

Imagen de consulta: ../datasets/tattoos/images/1_1_ar1.JPG
Imágenes más similares:
Imagen: ../datasets/tattoos/images/1_1_ar1.JPG, Similitud: 1.0000
Imagen: ../datasets/tattoos/images/1_1_ar4.JPG, Similitud: 0.9997
Imagen: ../datasets/tattoos/images/1_1_ar3.JPG, Similitud: 0.9997
Imagen: ../datasets/tattoos/images/1_1.JPG, Similitud: 0.9994
Imagen: ../datasets/tattoos/images/1_1_ar2.JPG, Similitud: 0.9988


#### Extract and match against processed dataset

In [16]:
# Extraer características de todas las imágenes del dataset
processed_dataset_features = []
processed_image_paths = []
for file_name in os.listdir(processed_images_folder):
    if file_name.lower().endswith(('.jpg', '.jpeg', '.png')):
        image_path = os.path.join(images_path, file_name)
        features = extract_features(image_path, model, device)
        processed_dataset_features.append(features)
        processed_image_paths.append(image_path)

# Convertir a numpy array para facilitar cálculos
processed_dataset_features = np.array(processed_dataset_features)

# Calcular similitudes (usando distancia coseno)
similarities = cosine_similarity(query_features.reshape(1, -1), processed_dataset_features)[0]

# Ordenar resultados por similitud
sorted_indices = np.argsort(similarities)[::-1]  # De mayor a menor
top_k = 5  # Número de resultados más similares a mostrar

print(f"Imagen de consulta: {query_image_path}")
print("Imágenes más similares:")
for idx in sorted_indices[:top_k]:
    print(f"Imagen: {processed_image_paths[idx]}, Similitud: {similarities[idx]:.4f}")

Imagen de consulta: ../datasets/tattoos/images/1_1_ar1.JPG
Imágenes más similares:
Imagen: ../datasets/tattoos/images/1_1_ar1.JPG, Similitud: 1.0000
Imagen: ../datasets/tattoos/images/1_1_ar4.JPG, Similitud: 0.9997
Imagen: ../datasets/tattoos/images/1_1_ar3.JPG, Similitud: 0.9997
Imagen: ../datasets/tattoos/images/1_1.JPG, Similitud: 0.9994
Imagen: ../datasets/tattoos/images/1_1_ar2.JPG, Similitud: 0.9988


---

### Total Time

This show the total time of execution

In [17]:
# Sets the total time of execution
end_time = time.time()
execution_time = end_time - start_time

# Calculate minutes and seconds
minutes = execution_time // 60  # Integer division for whole minutes
seconds = execution_time % 60   # Remainder for leftover seconds

print(f"Total execution time: {minutes} minutes and {seconds:.2f} seconds")

Total execution time: 4.0 minutes and 49.24 seconds
