In [2]:
import os
import pandas as pd
import shutil
import ast
from collections import Counter
from itertools import chain
from sklearn.model_selection import train_test_split

#### GENERAR DATASET DE SOLO UNA CLASE

In [19]:
# CONFIGURACIÓN
BASE_IMAGE_DIR = "E:/TFM/PADCHEST"  # <-- tu carpeta con subcarpetas 0, 1, ..., 50
OUTPUT_DIR = "E:/TFM/Dataset_una_clase"  # <-- carpeta donde guardarás el nuevo dataset
CSV_PATH = "E:/TFM/PADCHEST_chest_x_ray_images_labels_160K_01.02.19.csv"

# Escoger un hallazgo concreto
target_label = "descendent aortic elongation"

# Cargar CSV y filtrar solo anotaciones manuales
df = pd.read_csv(CSV_PATH, low_memory=False)
df = df[df["MethodLabel"] == "Physician"].copy()

# Convertir la columna 'Labels' en listas reales
df["Labels"] = df["Labels"].apply(ast.literal_eval)

# Etiqueta binaria: positivo si tiene el hallazgo, negativo si no
df["binary_label"] = df["Labels"].apply(lambda labels: "positive" if target_label in labels else "negative")

# Nos quedamos con todas las imágenes que tengan el hallazgo + un número similar sin el hallazgo
df_pos = df[df["binary_label"] == "positive"]
df_neg = df[df["binary_label"] == "negative"]

# Igualar número de negativos al de positivos
df_neg_sampled = df_neg.sample(n=len(df_pos), random_state=42)

df_binary = pd.concat([df_pos, df_neg_sampled])

In [20]:
# Split estratificado
train, temp = train_test_split(df_binary, test_size=0.4, stratify=df_binary["binary_label"], random_state=42)
val, test = train_test_split(temp, test_size=0.5, stratify=temp["binary_label"], random_state=42)

# Añadir columna de split
train["split"] = "train"
val["split"] = "val"
test["split"] = "test"

# Unir splits
final_df = pd.concat([train, val, test])

# Ruta completa a la imagen
def build_path(row):
    folder = str(row["ImageDir"])
    filename = row["ImageID"]
    return os.path.join(BASE_IMAGE_DIR, folder, filename)

final_df["image_path"] = final_df.apply(build_path, axis=1)

# Crear carpetas destino
for split in ["train", "val", "test"]:
    for label in ["positive", "negative"]:
        os.makedirs(os.path.join(OUTPUT_DIR, split, label), exist_ok=True)

# Copiar archivos
copied = []
for _, row in final_df.iterrows():
    src = row["image_path"]
    label = row["binary_label"]
    split = row["split"]
    dst = os.path.join(OUTPUT_DIR, split, label, os.path.basename(src))
    if os.path.exists(src):
        shutil.copy2(src, dst)
        copied.append((src, dst, label, split, row["ImageID"], row["ImageDir"]))

# Guardar resumen en CSV
summary_df = pd.DataFrame(copied, columns=["source_path", "dest_path", "label", "split", "ImageID", "ImageDir"])
summary_df.to_csv(os.path.join(OUTPUT_DIR, f"fewshot_summary_{target_label}.csv"), index=False)

# Generar CSV tipo original pero solo con imágenes copiadas
df_images_original = pd.read_csv(CSV_PATH, low_memory=False)
ids_copiados = set(summary_df["ImageID"].unique())
df_images_fewshot = df_images_original[df_images_original["ImageID"].isin(ids_copiados)].copy()

images_fewshot_path = os.path.join(OUTPUT_DIR, f"PADCHEST_chest_x_ray_images_fewshot_{target_label}.csv")
df_images_fewshot.to_csv(images_fewshot_path, index=False)

print(f"✅ Dataset few-shot binario para '{target_label}' preparado con éxito.")

✅ Dataset few-shot binario para 'descendent aortic elongation' preparado con éxito.


#### GENERAR DATASET MULTICLASE

In [None]:
# CONFIGURACIÓN
BASE_IMAGE_DIR = "E:/TFM/PADCHEST"  # <-- tu carpeta con subcarpetas 0, 1, ..., 50
OUTPUT_DIR = "E:/TFM/Dataset_multiclase"  # <-- carpeta donde guardarás el nuevo dataset
CSV_PATH = "E:/TFM/PADCHEST_chest_x_ray_images_labels_160K_01.02.19.csv"

# Cargar CSV y filtrar solo anotaciones manuales
df = pd.read_csv(CSV_PATH, low_memory=False)
df = df[df["MethodLabel"] == "Physician"].copy()

# Convertir la columna 'Labels' en listas reales
df["Labels"] = df["Labels"].apply(ast.literal_eval)

# Contar frecuencia de todas las etiquetas
all_labels = list(chain.from_iterable(df["Labels"]))
label_counts = Counter(all_labels)

# Seleccionar 15 etiquetas raras con al menos 20 imágenes
rare_labels = [label for label, count in sorted(label_counts.items(), key=lambda x: x[1]) if count >= 20][:15]

# Filtrar imágenes que contienen al menos una etiqueta rara
df["matched_rare_labels"] = df["Labels"].apply(lambda x: list(set(x) & set(rare_labels)))
df = df[df["matched_rare_labels"].map(len) > 0].copy()

# Expandir dataframe por etiqueta
df_exploded = df.explode("matched_rare_labels")

In [4]:
# Split estratificado
train, temp = train_test_split(df_exploded, test_size=0.4, stratify=df_exploded["matched_rare_labels"], random_state=42)
val, test = train_test_split(temp, test_size=0.5, stratify=temp["matched_rare_labels"], random_state=42)

# Añadir columna de split
train["split"] = "train"
val["split"] = "val"
test["split"] = "test"

# Unir splits
final_df = pd.concat([train, val, test])

# Ruta completa a la imagen
def build_path(row):
    folder = str(row["ImageDir"])
    filename = row["ImageID"]
    return os.path.join(BASE_IMAGE_DIR, folder, filename)

final_df["image_path"] = final_df.apply(build_path, axis=1)
final_df["label"] = final_df["matched_rare_labels"]

# Crear carpetas destino
for split in ["train", "val", "test"]:
    for label in rare_labels:
        os.makedirs(os.path.join(OUTPUT_DIR, split, label), exist_ok=True)

# Copiar archivos
copied = []
for _, row in final_df.iterrows():
    src = row["image_path"]
    label = row["label"]
    split = row["split"]
    dst = os.path.join(OUTPUT_DIR, split, label, os.path.basename(src))
    if os.path.exists(src):
        shutil.copy2(src, dst)
        copied.append((src, dst, label, split, row["ImageID"], row["ImageDir"]))

# Guardar resumen en CSV (ahora incluye ImageID y ImageDir)
summary_df = pd.DataFrame(copied, columns=["source_path", "dest_path", "label", "split", "ImageID", "ImageDir"])
summary_df.to_csv(os.path.join(OUTPUT_DIR, "fewshot_summary.csv"), index=False)

# Generar CSV tipo "PADCHEST_chest_x_ray_images_160k_01.02.19.csv" pero solo con imágenes copiadas
original_images_csv = "E:/TFM/PADCHEST_chest_x_ray_images_labels_160k_01.02.19.csv"  # Ruta del original
df_images_original = pd.read_csv(original_images_csv, low_memory=False)

# Filtrar por las imágenes copiadas
ids_copiados = set(summary_df["ImageID"].unique())
df_images_fewshot = df_images_original[df_images_original["ImageID"].isin(ids_copiados)].copy()

# Guardar CSV filtrado
images_fewshot_path = os.path.join(OUTPUT_DIR, "PADCHEST_chest_x_ray_images_fewshot.csv")
df_images_fewshot.to_csv(images_fewshot_path, index=False)

print("✅ Dataset few-shot preparado con éxito.")

✅ Dataset few-shot preparado con éxito.
