In [1]:
import os
import re
import numpy as np
import cv2
from pycocotools.coco import COCO
from PIL import Image
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
# Define la ruta de las imágenes y el archivo de anotaciones
image_directory = '/media/manuel/Robotica 4T/bd_unificada/images'
annotation_file = '/media/manuel/Robotica 4T/bd_unificada/combined.json'
output_directory = '/media/manuel/Robotica 4T/DS_Confinamiento'


In [3]:
# Instancia COCO
coco = COCO(annotation_file)

loading annotations into memory...
Done (t=21.89s)
creating index...
index created!


In [4]:
# Crea las carpetas de salida si no existen
os.makedirs(output_directory, exist_ok=True)
for category in coco.loadCats(coco.getCatIds()):
    class_name = category['name']
    os.makedirs(os.path.join(output_directory, class_name), exist_ok=True)

In [5]:
def extract_coordinates_and_dates(filename):
    # Utiliza expresiones regulares para extraer las coordenadas y fechas
    pattern = r"\[([^\]]+)\] - \('([^']+)', '([^']+)'\)"
    match = re.search(pattern, filename)
    if match:
        coords = match.group(1)
        date_range = match.group(2), match.group(3)
        return coords, date_range
    return None, None

def calculate_step(coords, img_width, img_height):
    # Convierte las coordenadas en una lista de floats
    lat_min, lon_min, lat_max, lon_max = map(float, coords.split(', '))
    
    # Calcula el paso en grados por pixel
    lat_step = (lat_max - lat_min) / img_height
    lon_step = (lon_max - lon_min) / img_width
    
    return lat_step, lon_step, lat_min, lon_min

def calculate_bbox_coordinates(lat_step, lon_step, lat_min, lon_min, bbox):
    # Extrae las coordenadas del bbox
    x, y, w, h = bbox
    
    # Calcula las coordenadas del bbox en grados
    new_lat_min = lat_min + y * lat_step
    new_lon_min = lon_min + x * lon_step
    new_lat_max = new_lat_min + h * lat_step
    new_lon_max = new_lon_min + w * lon_step
    
    return f"[{new_lat_min:.5f}, {new_lon_min:.5f}, {new_lat_max:.5f}, {new_lon_max:.5f}]"

# Lista para almacenar los nombres de los archivos generados
exported_filenames = []

def process_image(image_id):
    # Cargar la imagen
    img_info = coco.loadImgs(image_id)[0]
    img_path = os.path.join(image_directory, img_info['file_name'])
    img = cv2.imread(img_path)
    
    # Extraer coordenadas y rango de fechas del nombre del archivo original
    coords, date_range = extract_coordinates_and_dates(img_info['file_name'])
    if not coords or not date_range:
        print(f"Error al extraer datos del nombre del archivo: {img_info['file_name']}")
        return
    
    # Calcular el paso en grados por pixel
    lat_step, lon_step, lat_min, lon_min = calculate_step(coords, img.shape[1], img.shape[0])
    
    # Obtener las anotaciones de la imagen
    ann_ids = coco.getAnnIds(imgIds=image_id)
    anns = coco.loadAnns(ann_ids)

    for ann in anns:
        bbox = ann['bbox']
        class_id = ann['category_id']
        class_name = coco.loadCats(class_id)[0]['name']
        
        # Calcular las nuevas coordenadas del bbox
        new_coords = calculate_bbox_coordinates(lat_step, lon_step, lat_min, lon_min, bbox)
        
        # Extraer la bbox de la imagen
        x, y, w, h = map(int, bbox)
        bbox_img = img[y:y+h, x:x+w]
        
        # Generar el nombre del archivo con el formato especificado
        unique_id = f"{class_name}_{new_coords} - ('{date_range[0]}', '{date_range[1]}').png"
        output_path = os.path.join(output_directory, class_name, unique_id)
        
        # Guardar la imagen bbox
        cv2.imwrite(output_path, bbox_img)
        
        # Almacenar el nombre del archivo generado
        exported_filenames.append(unique_id)

# Obtener todos los IDs de las imágenes
image_ids = coco.getImgIds()




In [6]:
# Usar ThreadPoolExecutor para gestionar la concurrencia
num_threads = 4  # Ajusta este número según las capacidades de tu sistema
with ThreadPoolExecutor(max_workers=num_threads) as executor:
    # Enviar todas las tareas al ejecutor
    futures = {executor.submit(process_image, image_id): image_id for image_id in image_ids}
    
    # Esperar a que todas las tareas terminen
    for future in as_completed(futures):
        image_id = futures[future]
        try:
            future.result()  # Aquí podrías manejar el resultado si es necesario
        except Exception as e:
            print(f"Error procesando la imagen ID {image_id}: {e}")

print("Procesamiento completado.")

Error procesando la imagen ID 20693: OpenCV(4.6.0) /io/opencv/modules/imgcodecs/src/loadsave.cpp:801: error: (-215:Assertion failed) !_img.empty() in function 'imwrite'

Procesamiento completado.


In [None]:
# Crear threads para procesar las imágenes en paralelo
threads = []
for image_id in image_ids:
    thread = threading.Thread(target=process_image, args=(image_id,))
    threads.append(thread)
    thread.start()

# Esperar a que todos los threads terminen
for thread in threads:
    thread.join()

# Imprimir los nombres de los archivos generados
for filename in exported_filenames:
    print(filename)

print("Procesamiento completado.")