# Stratified K-Fold para Detecção de Objetos (Formato COCO)

Depois de ler o material que você me mandou, implementar a versão padrão (a mais comum) e refletir um pouco, resolvi deixar aqui uma segunda versão do split_data.py. Ainda não dei uma olhada no dataset, então essa proposta é mais baseada em suposições.

A ideia de fazer isso é que eu imagino que o motivo pelo qual você queria usar um stratified k-fold não é necessariamente por causa da quantidade de classes diferentes, mas sim pelo número de classes presentes em cada imagem. Suponho que os manômetros estejam agrupados em um mesmo local, o que pode significar pouca variação de tipos entre as imagens, mas uma grande variação na quantidade deles.

Então aqui vai uma sugestão de como você poderia implementar essa segunda abordagem. No código, deixei a versão mais simples, que considera apenas as classes presentes na imagem (sem levar em conta o número de instâncias de cada uma, só sua presença).

### Primeiro passo: Estabelecer uma função auxiliar que vai separar as imagens pelo número de instâncias da classe. (opcional)

ChatGPT e Gemini disseram que não compensa colocar um label pra cada número, mas sim fazer em intervalos pra economizar tempo e poder computacional. Como não sei quão preciso você precisaria ser, deixei aqui só a função exemplo com intervalos, mas pode fazer mais se quiser, o skfold aguenta.

Ah, e o motivo de não usar o count do próprio create_kfold_splits é só por facilidade mesmo, querendo ou não uma função separada, pelo menso pra agora que eu não sei quantos intervalos seriam necessários, é mais claro editar uma função auxiliar externa do que mexer na função que você já tinha criado (mas se quiser, dá pra kover lá pra dentro depois).

In [None]:
def get_count_bin(count: int) -> str:
    """Categoriza uma contagem de objetos em faixas"""
    count = int(count)
    if count == 0:
        return 'none' # Não deveria acontecer, mas vai que
    elif count <= 2:
        return 'low'   # 1 a 2 objetos
    elif count <= 5:
        return 'medium'# 3 a 5 objetos
    else:
        return 'high'  # 6 ou mais objetos

#Pode colocar esse método bem no começo do código se quiser, mas acho que não faz tanta diferença desde que ele seja chamada antes do uso.

### Segundo passo: mudar o create_kfold_splits_with_dataframe (Editar a chave de estraficação)

Assim como tá implementado atualmente a chave de estratificação (que é que cria essas novas labels pro kfold com as diversas classes), precisamos alterá-la para ela ser capaz de receber esse fato da quantidade. Se esse método que eu tô apresentando agora for no fim o preferido, é só copiar o que tá aqui em baixo e substituir todo o trecho da chave de estratificação atual.

In [None]:
def create_kfold_splits_with_dataframe(...):
    [...]
    #Toda a parte do class count dataframe
    [...]
    stratify_keys_list = []
    # Itera sobre cada imagem no DataFrame
    for _, row in labels_df.iterrows(): #Não sei se pode tirar o "_" e substituir por algo mais informativo, então deixei assim (Até onde compreendo, o pandas não precisa) 
        key_parts = []
        # Pega apenas as classes que estão presentes na imagem
        present_classes = row[row > 0]

        if present_classes.empty:
            stratify_keys_list.append('empty')
            continue

        # Para cada classe presente, cria uma chave 'classe_faixa', ou seja, conta e coloca naqueles intervalos pré-definidos (low, medium, high)
        for class_name, count in present_classes.items():
            bin_name = get_count_bin(count)
            key_parts.append(f"{class_name}_{bin_name}")
        
        # Ordena as partes para garantir consistência (fazer com que a ordem dos fatores (labels) não afete o produto final (deixa as labels iguais))
        key_parts.sort()
        stratify_keys_list.append('_'.join(key_parts))
    
    # Converte a lista de chaves para uma Série pandas para usar no split
    stratify_key = pd.Series(stratify_keys_list, index=labels_df.index)

    logger.info("Stratification key created.")

### Passo três: Ir pro abraço!!

O resto segue como já estava, ou seja, você rodaria o skfold como já está lá com os novos argumentos e reza pra dar certo (não precisa mexer em mais nada, a não ser naquele k no começo do código que está setado como 0 no momento).

Como ficou o código em geral então:

In [None]:
import json
import shutil
import random
import argparse
import logging
from pathlib import Path
from collections import Counter

import pandas as pd
from sklearn.model_selection import StratifiedKFold

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_count_bin(count: int) -> str:
    """
    Counts the number of objects in each image and categorizes them into bins.
    """
    count = int(count)
    if count == 0:
        return 'none' 
    elif count <= 2:
        return 'low'   
    elif count <= 5:
        return 'medium'
    else:
        return 'high'  
    
def split_data(images_dir, coco_json_path, output_dir, 
               train_ratio=0.75, val_ratio=0.1, ablation=0, k=0, pose_estimation=False,
               rename_images=False, classes=[]):
    """
    Splits a COCO dataset into training, validation, and testing sets or creates k-fold splits.
    """
    logger.info("Loading COCO annotations...")
    with open(coco_json_path, 'r') as f:
        coco_data = json.load(f)

    images_dir = Path(images_dir)
    output_dir = Path(output_dir)

    if not images_dir.exists():
        logger.error(f"Images directory does not exist: {images_dir}")
        return
    
    if classes:
        coco_data = filter_coco_by_classes(coco_data, classes)

    images = coco_data.get('images', [])
    annotations = coco_data.get('annotations', [])
    categories = coco_data.get('categories', [])

    logger.info(f"Total annotated images: {len(images)}")
    logger.info(f"Total annotated objects: {len(annotations)}")

    if k > 0:
        create_kfold_splits_with_dataframe(images, annotations, categories, images_dir, output_dir, k, rename_images, pose_estimation)
    elif ablation > 0:
        splits = generate_splits(images, train_ratio, val_ratio, ablation)
        process_splits(splits, images_dir, annotations, categories, output_dir, rename_images, pose_estimation)
    else:
        splits = generate_splits(images, train_ratio, val_ratio, ablation=0)
        process_splits(splits, images_dir, annotations, categories, output_dir, rename_images, pose_estimation)

def create_kfold_splits_with_dataframe(images, annotations, categories, images_dir, output_dir, k, rename_images, pose_estimation):
    """
    Create k-fold splits for the dataset using a pandas DataFrame of class counts per image.
    """
    logger.info("Creating class count DataFrame...")

    # Create a mapping from image IDs to their file names
    image_id_to_filename = {image['id']: image['file_name'] for image in images}

    # Create a mapping from category IDs to their names
    category_id_to_name = {category['id']: category['name'] for category in categories}

    # Initialize a DataFrame
    index = [image['file_name'] for image in images]  # Use image file names as index
    labels_df = pd.DataFrame(0, columns=category_id_to_name.values(), index=index)

    # Populate the DataFrame with object counts per class for each image
    for annotation in annotations:
        image_id = annotation['image_id']
        category_id = annotation['category_id']

        if image_id in image_id_to_filename:
            image_name = image_id_to_filename[image_id]
            class_name = category_id_to_name[category_id]
            labels_df.loc[image_name, class_name] += 1

    labels_df = labels_df.fillna(0)  # Replace NaN values with 0
    logger.info("Class count DataFrame created.")

    logger.info("Creating stratification key for k-fold splitting...")
    stratify_keys_list = []
    for _, row in labels_df.iterrows():
        key_parts = []
        # Gets only the classes that are present in the image
        present_classes = row[row > 0]

        if present_classes.empty:
            stratify_keys_list.append('empty')
            continue

        # For each class, crate a key 'classname_countbin'
        for class_name, count in present_classes.items():
            bin_name = get_count_bin(count)
            key_parts.append(f"{class_name}_{bin_name}")
        
        # Orders the parts to ensure consistency
        key_parts.sort()
        stratify_keys_list.append('_'.join(key_parts))
    
    # Convert the list of keys to a pandas Series for use in the split
    stratify_key = pd.Series(stratify_keys_list, index=labels_df.index)
    logger.info("Stratification key created.")

    # Perform stratified k-fold splitting
    skfold = StratifiedKFold(n_splits=k, shuffle=True, random_state=42) 

    for fold, (train_idx, val_idx) in enumerate(skfold.split(labels_df, stratify_key)):
        fold_name = f"fold_{fold+1}"
        logger.info(f"Processing {fold_name}...")

        # Get the training and validation file names
        train_files = labels_df.index[train_idx].tolist()
        val_files = labels_df.index[val_idx].tolist()

        # Filter images and annotations for each split
        train_images = [img for img in images if img['file_name'] in train_files]
        val_images = [img for img in images if img['file_name'] in val_files]

        train_annotations = filter_annotations(train_images, annotations, pose_estimation)
        val_annotations = filter_annotations(val_images, annotations, pose_estimation)

        # Prepare directories
        # train_images_path = output_dir / fold_name / "images" 
        # val_images_path = output_dir / fold_name / "images" 

        train_images_path = output_dir / fold_name / "images" / "train"
        train_labels_path = output_dir / fold_name / "labels" / "train"
        val_images_path = output_dir / fold_name / "images" / "val"
        val_labels_path = output_dir / fold_name / "labels" / "val"
        
        train_images_path.mkdir(parents=True, exist_ok=True)
        train_labels_path.mkdir(parents=True, exist_ok=True)
        val_images_path.mkdir(parents=True, exist_ok=True)
        val_labels_path.mkdir(parents=True, exist_ok=True)

        # Copy images and create subsets
        train_updated_images = copy_images(train_images, images_dir, train_images_path, rename_images)
        val_updated_images = copy_images(val_images, images_dir, val_images_path, rename_images)

        train_coco = create_coco_subset(train_updated_images, train_annotations, categories)
        val_coco = create_coco_subset(val_updated_images, val_annotations, categories)

        # Save JSON files
        with open(train_labels_path / "coco.json", 'w') as f:
            json.dump(train_coco, f, indent=4)
        with open(val_labels_path / "coco.json", 'w') as f:
            json.dump(val_coco, f, indent=4)

        logger.info(f"{fold_name} split completed.")

def process_splits(splits, images_dir, annotations, categories, output_dir, rename_images, pose_estimation):
    """
    Process and save splits for training, validation, and testing.
    """
    for split_name, images_set in splits.items():
        try:
            # Create directories
            images_output_path = output_dir / "images" / split_name
            labels_output_path = output_dir / "labels" / split_name
            images_output_path.mkdir(parents=True, exist_ok=True)
            labels_output_path.mkdir(parents=True, exist_ok=True)

            # Copy images and create COCO subset
            updated_images = copy_images(images_set, images_dir, images_output_path, rename_images)
            filtered_annotations = filter_annotations(updated_images, annotations, pose_estimation)
            coco_subset = create_coco_subset(updated_images, filtered_annotations, categories)

            # Save COCO JSON
            with open(labels_output_path / "coco.json", 'w') as file:
                json.dump(coco_subset, file, indent=4)

            # Save metadata
            save_metadata(output_dir, split_name, coco_subset)

            logger.info(f"Dataset for {split_name} saved successfully.")
        except Exception as e:
            logger.error(f"Failed to process data for {split_name}: {e}")

def generate_splits(images, train_ratio, val_ratio, ablation):
    """
    Generates a dictionary mapping split names to image lists based on given ratios and ablation settings.
    """
    random.shuffle(images)
    total_images = len(images)

    if ablation > 0:
        val_size = int(total_images * val_ratio)
        val_images = images[:val_size]
        ablation_images = images[val_size:]

        ablation_chunks = [int(len(ablation_images) * (i + 1) / ablation) for i in range(ablation)]
        splits = {"val": val_images}
        for i, chunk_size in enumerate(ablation_chunks):
            percentage = f"{int((chunk_size / len(images)) * 100)}%"
            splits[percentage] = ablation_images[:chunk_size]
    else:
        train_end = int(total_images * train_ratio)
        val_end = train_end + int(total_images * val_ratio)

        splits = {
            "train": images[:train_end],
            "val": images[train_end:val_end],
            "test": images[val_end:],
        }

    return splits

def copy_images(images, src_dir, dest_dir, rename_images, name_padding=5):
    """
    Copy selected images to a specified directory, and optionally rename them with new numerical IDs.
    Update the images' filenames in the dataset metadata if renamed.
    """

    id_format = f"{{:0{str(name_padding)}d}}"

    updated_images = []
    for image in images:
        try:
            image_path = Path(image['file_name'])

            if rename_images:
                if image_path.suffix in ['.jpg', '.jpeg']:
                    image_name = image_path.stem + '.jpeg'
                else:
                    image_name = image_path.name
               
                # If rename_images is True, rename files using numerical IDs
                new_file_name = id_format.format(image["id"]) + "." + image_name.split(".")[-1]

                # Update the image metadata to reflect the new filename
                updated_image = image.copy()
                updated_image['file_name'] = new_file_name
                updated_images.append(updated_image)
           
            else:
                updated_images = images
            
            src_path = Path(src_dir) / image['file_name']
            dest_path = Path(dest_dir) / new_file_name
            shutil.copy(src_path, dest_path)
            logger.debug(f"Successfully copied {src_path} to {dest_path}")
        
        except Exception as e:
            logger.error(f"Failed to copy {src_path} to {dest_path}: {e}")

    return updated_images

def filter_annotations(images_set, annotations, pose_estimation):
    """
    Filter annotations to include only those for the provided image set.
    """
    image_ids = {image['id'] for image in images_set}
    if pose_estimation:
        return [annotation for annotation in annotations if annotation['image_id'] in image_ids and 'keypoints' in annotation]
    else:
        return [annotation for annotation in annotations if annotation['image_id'] in image_ids]

def filter_coco_by_classes(coco_data, classes):
    """
    Filters the COCO data to only include specified classes.
    """
    category_name_to_id = {category['name']: category['id'] for category in coco_data['categories']}

    selected_category_ids = [category_name_to_id[name] for name in classes if name in category_name_to_id]

    if not selected_category_ids:
        logger.error(f"No matching categories found for classes: {classes}")
        raise ValueError(f"No matching categories found for classes: {classes}")

    filtered_annotations = [annotation for annotation in coco_data['annotations'] if annotation['category_id'] in selected_category_ids]

    image_ids = {annotation['image_id'] for annotation in filtered_annotations}

    filtered_images = [image for image in coco_data['images'] if image['id'] in image_ids]

    filtered_categories = [category for category in coco_data['categories'] if category['id'] in selected_category_ids]

    filtered_coco_data = {
        'images': filtered_images,
        'annotations': filtered_annotations,
        'categories': filtered_categories
    }

    return filtered_coco_data

def create_coco_subset(images, annotations, categories):
    """
    Create a COCO-formatted subset from images, annotations, and categories.
    """
    return {
        'images': images,
        'annotations': annotations,
        'categories': categories
    }

def log_object_count_per_class(coco_data):
    """
    Logs the total number of objects for each class in the COCO dataset.
    """
    category_counts = {category['name']: 0 for category in coco_data['categories']}
    for annotation in coco_data['annotations']:
        category_id = annotation['category_id']
        category_name = next((cat['name'] for cat in coco_data['categories'] if cat['id'] == category_id), None)
        if category_name:
            category_counts[category_name] += 1

    logger.info("Object counts per class:")
    for category, count in category_counts.items():
        logger.info(f"  {category}: {count}")
    return category_counts

def save_metadata(output_dir, split_name, coco_subset):
    """
    Save metadata, such as class-wise object counts, to a text file.
    """
    chunk_category_counts = log_object_count_per_class(coco_subset)
    meta_file_path = output_dir / f".{split_name}_meta.txt"
    with open(meta_file_path, 'w') as meta_file:
        meta_file.write("Class-wise Object Counts:\n")
        for category, count in chunk_category_counts.items():
            meta_file.write(f"{category}: {count}\n")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Split COCO dataset into training, validation, testing sets or k-fold splits.")
    parser.add_argument("images_dir", help="Path to the input directory containing images.")
    parser.add_argument("coco_json_path", help="Path to the COCO JSON file containing annotations.")
    parser.add_argument("output_dir", help="Path to the root output directory for splits.")
    parser.add_argument("--k", type=int, default=0, help="Number of folds for k-fold cross-validation (default: 0, i.e., no k-fold split).")
    parser.add_argument("--train_ratio", type=float, default=0.75, help="Proportion of images for training")
    parser.add_argument("--val_ratio", type=float, default=0.1, help="Proportion of images for validation")
    parser.add_argument("--ablation", type=int, default=0, help="Number of dataset chunks for ablation testing")
    parser.add_argument("--rename_images", action="store_true", default=True, help="Assign new numerical IDs to image file names")
    parser.add_argument("--classes", nargs='+', default=[], help="List of class names to process (default: all classes)")

    args = parser.parse_args()

    split_data(
        images_dir=args.images_dir,
        coco_json_path=args.coco_json_path,
        output_dir=args.output_dir,
        train_ratio=args.train_ratio,
        val_ratio=args.val_ratio,
        ablation=args.ablation,
        k=args.k,
        rename_images=args.rename_images,
        classes=args.classes
    )


### OBS: Essa implementação, como eu tinha falado no começo, não compensa se tiver muitas classes únicas
Então, se quiser verificar se isso isso é eficiente, dá pra rodar um:

In [None]:
#Logo abaixo da chave de estratificação
logger.info("Top 5 estratos mais comuns com sua contagem:")
logger.info("\n" + str(stratify_key.value_counts().head(5)))

E aí você olha se tem bastante imagens ou empty, ou com sla, manometro_digital_high. Se tiver muitos lows, quer dizer que não compensa o trabalho computacional, ou seja, podemos deixar no método padrão que ele já vai fazer a divisão proporcional, e a chave do random (no caso 42) vai provavelmente separar uma quantidade semelhante para cada fold (dado que o dataset é grande o suficiente).