In [None]:
import pandas as pd
import numpy as np
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import copy
import matplotlib.pyplot as plt
import torch.optim as optim
import torchvision.models as models
from torch.utils.data import random_split

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.width', 1000)

# Función para procesar cada CSV y retornar el DataFrame filtrado
def process_csv(file_path):
    df = pd.read_csv(file_path)
    
    new_rows = []
    # Iteramos sobre los índices (0 a 3) para generar las filas procesadas
    for idx in range(4):
        material_col = f'nep_materiality_visual_{idx}'
        biological_col = f'nep_biological_visual_{idx}'
        landscape_col = f'landscape-type_visual_{idx}'
        
        temp_df = df[['platform_id', 'nature_visual', material_col, biological_col, landscape_col]].copy()
        temp_df['index'] = idx
        
        temp_df = temp_df.rename(columns={
            material_col: 'nep_materiality_visual',
            biological_col: 'nep_biological_visual',
            landscape_col: 'landscape-type_visual'
        })
        
        temp_df['image_name'] = temp_df['platform_id'].astype(str) + '_' + temp_df['index'].astype(str)
        new_rows.append(temp_df)
    
    new_df = pd.concat(new_rows, ignore_index=True)
    new_df = new_df.dropna(subset=['nep_materiality_visual', 'nep_biological_visual'])
    new_df = new_df.sort_values(by=['platform_id', 'index']).reset_index(drop=True)
    
    # Función para procesar la columna nature_visual
    def process_nature(val):
        if pd.isna(val):
            return np.nan
        items = [item.strip() for item in str(val).split(';')]
        for item in items:
            if item.lower() == "yes":
                return "Yes"
        return "No"
    
    # Función para procesar las demás columnas
    def process_other(val):
        if pd.isna(val):
            return np.nan
        items = [item.strip() for item in str(val).split(';')]
        if len(items) == 1:
            return items[0]
        if len(set(items)) == 1:
            return items[0]
        return "; ".join(items)
    
    # Aplicamos las funciones a cada columna correspondiente
    new_df['nature_visual'] = new_df['nature_visual'].apply(process_nature)
    new_df['nep_materiality_visual'] = new_df['nep_materiality_visual'].apply(process_other)
    new_df['nep_biological_visual'] = new_df['nep_biological_visual'].apply(process_other)
    new_df['landscape-type_visual'] = new_df['landscape-type_visual'].apply(process_other)
    
    # Reordenamos las columnas según el orden deseado
    new_order = [
        'image_name', 
        'nature_visual', 
        'nep_materiality_visual', 
        'nep_biological_visual', 
        'landscape-type_visual', 
        'platform_id', 
        'index'
    ]
    new_df = new_df[new_order]
    
    # Función que verifica si una celda contiene más de un valor único
    def tiene_contradicciones(valor):
        if pd.isna(valor):
            return False
        items = [item.strip() for item in str(valor).split(';')]
        return len(set(items)) > 1
    
    cols = ['nature_visual', 'nep_materiality_visual', 'nep_biological_visual', 'landscape-type_visual']
    df_bool = new_df[cols].applymap(tiene_contradicciones)
    
    # Filtramos filas donde ninguna celda tenga contradicción
    agreed_df = new_df[~df_bool.any(axis=1)]
    
    return agreed_df

# Rutas de los dos archivos CSV
csv_path1 = '/fhome/pfeliu/tfg_feliu/data/39_20250401_0816.csv'
csv_path2 = '/fhome/pfeliu/tfg_feliu/data/43.csv'

# Procesamos cada CSV
df1 = process_csv(csv_path1)
df2 = process_csv(csv_path2)

# Concatenamos los DataFrames procesados en uno solo
agree_df = pd.concat([df1, df2], ignore_index=True)
agree_df = agree_df[agree_df["landscape-type_visual"] != "forest_and_seminatural_areas,water_bodies"]



In [None]:

agree_df.to_csv('/fhome/pfeliu/tfg_feliu/data/X_labels_agreements_0104.csv', index=False)

In [None]:

dataset = agree_df.copy()

# Clase customizada para el dataset multitarea
class MultiTaskImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None, label_maps=None):
        """
        Args:
            csv_file (string): Ruta al archivo CSV con las etiquetas.
            img_dir (string): Directorio con las imágenes.
            transform (callable, optional): Transformaciones a aplicar a la imagen.
            label_maps (dict, optional): Diccionario para mapear etiquetas de cada tarea a índices numéricos.
                Ejemplo:
                {
                    "nature_visual": {"Yes": 1, "No": 0},
                    "nep_materiality_visual": {"material": 0, "immaterial": 1},
                    "nep_biological_visual": {"biotic": 0, "abiotic": 1},
                    "landscape-type_visual": {"artificial_surfaces": 0, "forest_and_seminatural_areas": 1}
                }
        """
        #self.data = pd.read_csv(csv_file)
        self.data = pd.read_csv(csv_file).fillna("NaN")

        self.img_dir = img_dir
        self.transform = transform
        self.label_maps = label_maps

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Selecciona la fila del CSV
        row = self.data.iloc[idx]
        
        base_img_name = row['image_name']
        
        # Lista de extensiones posibles
        extensions = ['.jpg', '.png', '.jpeg']
        img_path = None
        # Buscar la primera imagen que exista en el directorio
        for ext in extensions:
            candidate = os.path.join(self.img_dir, base_img_name + ext).replace('\\', '/')
            if os.path.exists(candidate):
                img_path = candidate
                break
        if img_path is None:
            raise FileNotFoundError(f"No se encontró la imagen: {base_img_name} con ninguna de las extensiones {extensions}")
        
        # Abre la imagen y conviértela a RGB
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        
        # Extrae y (opcionalmente) mapea las etiquetas para cada tarea
        labels = {}
        for task in ['nature_visual', 'nep_materiality_visual', 'nep_biological_visual', 'landscape-type_visual']:
            label_value = row[task]
            # Si se definieron mapeos, los aplica
            if self.label_maps and task in self.label_maps:
                label_value = self.label_maps[task].get(label_value, -1)  # -1 o cualquier valor para etiquetas no mapeadas
            labels[task] = label_value
        
        # Retorna la imagen y las etiquetas
        return image, row['image_name'], labels

# Ejemplo de transformaciones (adecuadas para modelos preentrenados en ImageNet)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Medias ImageNet
                         std=[0.229, 0.224, 0.225])   # Desviaciones ImageNet
])

# Ejemplo de mapeo de etiquetas
label_maps = {
    "nature_visual": {"Yes": 1, "No": 0},
    "nep_materiality_visual": {"material": 0, "immaterial": 1},
    "nep_biological_visual": {"biotic": 0, "abiotic": 1},
    "landscape-type_visual": {"artificial_surfaces": 0, "forest_and_seminatural_areas": 1, "wetlands": 2, "water_bodies": 3, "agricultural_areas": 4, "other": 5, "none": 6, "NaN": 7} 
}

for col in ['nature_visual', 'nep_materiality_visual', 'nep_biological_visual', 'landscape-type_visual']:
    print(f"Etiquetas únicas en {col}:")
    print(dataset[col].unique())
    print("------")
    print(dataset[col].value_counts())

# Instanciación del dataset
dataset = MultiTaskImageDataset(
    csv_file=''/fhome/pfeliu/tfg_feliu/data/X_labels_agreements_0104.csv',
    img_dir= ''/fhome/pfeliu/tfg_feliu/data/twitter',
    transform=transform,
    label_maps=label_maps
)

# Creación de un DataLoader (opcional, para entrenamiento)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

'''# Ejemplo de iterar sobre el dataloader
for images, img_name, labels in dataloader:
    # 'images' es un tensor de tamaño [batch_size, 3, 224, 224]
    # 'labels' es un diccionario con las 4 tareas, por ejemplo:
    #   labels['nature_visual'] -> tensor de etiquetas para la tarea nature
    print(images.shape)
    print(labels)
    break'''




In [None]:
print("Total de imágenes:", len(dataset))
print("Total de batches:", len(dataloader))

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Función para desnormalizar y mostrar la imagen con título opcional
def imshow_tensor(image_tensor, title=None):
    # image_tensor tiene forma [C, H, W]
    image = image_tensor.numpy().transpose((1, 2, 0))
    # Desnormalizamos con las medias y desviaciones utilizadas
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = std * image + mean
    image = np.clip(image, 0, 1)
    plt.imshow(image)
    if title:
        plt.title(title)
    plt.axis('off')
    plt.show()

for idx in range(3):

    # Ejemplo: mostramos la primera imagen del dataset con su nombre y etiquetas
    sample_image, sample_image_name, sample_labels = dataset[idx]

    print("Nombre de la imagen:", sample_image_name)
    print("Etiquetas:")
    for key, value in sample_labels.items():
        print(f"  {key}: {value}")

    # Mostramos la imagen y ponemos el nombre de la imagen como título
    imshow_tensor(sample_image, title=sample_image_name)


In [None]:
# Usamos ResNet50 preentrenado en ImageNet y retiramos la capa fc final
resnet = models.resnet50(pretrained=True)
resnet.fc = nn.Identity()  # Retiramos la capa fc final

# Extraemos todas las capas excepto la última
#backbone = nn.Sequential(*list(resnet.children())[:-1])
backbone = resnet

# Definimos un wrapper para aplanar la salida (la salida de ResNet50 es [batch, 2048, 1, 1])
class ResNet50Backbone(nn.Module):
    def __init__(self, backbone):
        super(ResNet50Backbone, self).__init__()
        self.backbone = backbone
    def forward(self, x):
        x = self.backbone(x)  # [batch, 2048, 1, 1]
        x = torch.flatten(x, 1)  # [batch, 2048]
        return x

backbone_model = ResNet50Backbone(backbone)

# Modelo MultiTarea con 4 cabezas de salida
class MultiTaskModel(nn.Module):
    def __init__(self, backbone, feature_dim=2048):
        super(MultiTaskModel, self).__init__()
        self.backbone = backbone
        # Cabeza para nature_visual (2 clases)
        self.fc_nature = nn.Linear(feature_dim, 2)
        # Cabeza para nep_materiality_visual (2 clases)
        self.fc_materiality = nn.Linear(feature_dim, 2)
        # Cabeza para nep_biological_visual (2 clases)
        self.fc_biological = nn.Linear(feature_dim, 2)
        # Cabeza para landscape-type_visual (7 clases)
        self.fc_landscape = nn.Linear(feature_dim, 8)
        
    def forward(self, x):
        features = self.backbone(x)
        out_nature = self.fc_nature(features)
        out_materiality = self.fc_materiality(features)
        out_biological = self.fc_biological(features)
        out_landscape = self.fc_landscape(features)
        return out_nature, out_materiality, out_biological, out_landscape

model = MultiTaskModel(backbone_model)

# Mover modelo al dispositivo (GPU si está disponible)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


# ================================
# Dataset train y test
# ================================


dataset_size = len(dataset)
train_size = int(0.9 * dataset_size)
test_size = dataset_size - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Crear DataLoaders para cada split
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# ================================
# Entrenamiento
# ================================
# Definimos las funciones de pérdida para cada tarea (usando CrossEntropyLoss para clasificación)
criterion = nn.CrossEntropyLoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 5  # Ajusta el número de épocas según tu necesidad

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, image_names, labels in dataloader:
        images = images.to(device)
        labels_nature = labels['nature_visual'].to(device)
        labels_materiality = labels['nep_materiality_visual'].to(device)
        labels_biological = labels['nep_biological_visual'].to(device)
        labels_landscape = labels['landscape-type_visual'].to(device)
        
        optimizer.zero_grad()
        out_nature, out_materiality, out_biological, out_landscape = model(images)
        
        loss_nature = criterion(out_nature, labels_nature)
        loss_materiality = criterion(out_materiality, labels_materiality)
        loss_biological = criterion(out_biological, labels_biological)
        loss_landscape = criterion(out_landscape, labels_landscape)
        
        loss = loss_nature + loss_materiality + loss_biological + loss_landscape
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(dataloader):.4f}")

# ================================
# Ejemplo de Predicción
# ================================

model.eval()
test_loss = 0.0
correct_nature = 0
correct_materiality = 0
correct_biological = 0
correct_landscape = 0
total = 0

with torch.no_grad():
        for images, image_names, labels in test_loader:
            images = images.to(device)
            labels_nature = labels['nature_visual'].to(device)
            labels_materiality = labels['nep_materiality_visual'].to(device)
            labels_biological = labels['nep_biological_visual'].to(device)
            labels_landscape = labels['landscape-type_visual'].to(device)
            
            out_nature, out_materiality, out_biological, out_landscape = model(images)
            
            loss_nature = criterion(out_nature, labels_nature)
            loss_materiality = criterion(out_materiality, labels_materiality)
            loss_biological = criterion(out_biological, labels_biological)
            loss_landscape = criterion(out_landscape, labels_landscape)
            
            loss = loss_nature + loss_materiality + loss_biological + loss_landscape
            test_loss += loss.item()
            
            # Para calcular exactitud, obtenemos el índice con mayor puntuación en cada tarea
            preds_nature = out_nature.argmax(dim=1)
            preds_materiality = out_materiality.argmax(dim=1)
            preds_biological = out_biological.argmax(dim=1)
            preds_landscape = out_landscape.argmax(dim=1)
            
            correct_nature += (preds_nature == labels_nature).sum().item()
            correct_materiality += (preds_materiality == labels_materiality).sum().item()
            correct_biological += (preds_biological == labels_biological).sum().item()
            correct_landscape += (preds_landscape == labels_landscape).sum().item()
            total += images.size(0)
    
avg_test_loss = test_loss / len(test_loader)
print(f"Epoch {epoch+1}/{num_epochs}, Test Loss: {avg_test_loss:.4f}")
print("Accuracy por tarea:")
print(f"  Nature: {correct_nature/total*100:.2f}%")
print(f"  Materiality: {correct_materiality/total*100:.2f}%")
print(f"  Biological: {correct_biological/total*100:.2f}%")
print(f"  Landscape: {correct_landscape/total*100:.2f}%")
print("----------------------------------------------------")

'''# Poniendo el modelo en modo evaluación
model.eval()
with torch.no_grad():
    images, image_names, labels = next(iter(dataloader))
    images = images.to(device)
    outputs = model(images)
    # outputs es una tupla de 4 salidas
    print("Predicciones de la primera batch:")
    print("Nature:", outputs[0].argmax(dim=1))
    print("Materiality:", outputs[1].argmax(dim=1))
    print("Biological:", outputs[2].argmax(dim=1))
    print("Landscape:", outputs[3].argmax(dim=1))'''

In [None]:
# Helper function to desnormalize and display an image with a title
def imshow_tensor(image_tensor, title=None):
    # Convert the tensor (C, H, W) to a NumPy array (H, W, C)
    image = image_tensor.cpu().numpy().transpose((1, 2, 0))
    # Desnormalize using ImageNet's mean and std
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = std * image + mean
    image = np.clip(image, 0, 1)
    plt.imshow(image)
    if title:
        plt.title(title, fontsize=8)
    plt.axis('off')
    plt.show()

# Evaluación: iterate over the test_loader and display images with labels and predictions
model.eval()
with torch.no_grad():
    for images, image_names, labels in test_loader:
        images = images.to(device)
        # Compute model outputs
        out_nature, out_materiality, out_biological, out_landscape = model(images)
        
        # Get predictions by taking the argmax for each task
        preds_nature = out_nature.argmax(dim=1)
        preds_materiality = out_materiality.argmax(dim=1)
        preds_biological = out_biological.argmax(dim=1)
        preds_landscape = out_landscape.argmax(dim=1)
        
        # Since the DataLoader's default collate returns a dictionary of lists for labels,
        # we assume labels['task'] is a list of integers. If not, adjust accordingly.
        # Iterate over each image in the batch
        for i in range(len(images)):
            # Ground truth labels for this sample
            gt_nature = labels['nature_visual'][i]
            gt_materiality = labels['nep_materiality_visual'][i]
            gt_biological = labels['nep_biological_visual'][i]
            gt_landscape = labels['landscape-type_visual'][i]
            
            # Predictions for this sample (convert tensor to int)
            pred_nature = preds_nature[i].item()
            pred_materiality = preds_materiality[i].item()
            pred_biological = preds_biological[i].item()
            pred_landscape = preds_landscape[i].item()
            
            # Create a title string with image name, ground truth and predictions
            title = (f"Name: {image_names[i]}\n"
                     f"GT - Nature: {gt_nature}, Materiality: {gt_materiality}, "
                     f"Biological: {gt_biological}, Landscape: {gt_landscape}\n"
                     f"Pred - Nature: {pred_nature}, Materiality: {pred_materiality}, "
                     f"Biological: {pred_biological}, Landscape: {pred_landscape}")
            
            # Display the image with the title
            imshow_tensor(images[i], title=title)
        # Break after one batch (or remove the break to process all batches)
        break