In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
from PIL import Image
import pandas as pd
import numpy as np
import optuna
from optuna.visualization import plot_param_importances, plot_optimization_history
import matplotlib.pyplot as plt

In [16]:
# Configuración del dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [18]:
# Rutas de archivos
csv_file_path = 'Proyecto/poi_dataset.csv'
images_dir = 'Proyecto/data_main/data_main'

In [None]:
# Cargar y preprocesar metadatos
metadata = pd.read_csv(csv_file_path)
metadata['engagement'] = (metadata['Likes'] / (metadata['Visits'] + 1e-5) > 0.1).astype(int)
scaler = StandardScaler()
metadata[['locationLat', 'locationLon']] = scaler.fit_transform(metadata[['locationLat', 'locationLon']])


In [None]:
# División de datos
train_data, test_data = train_test_split(metadata, test_size=0.2, stratify=metadata['engagement'], random_state=42)
train_data, val_data = train_test_split(train_data, test_size=0.25, stratify=train_data['engagement'], random_state=42)


In [None]:
# Transformaciones de imágenes
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_test_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
# Dataset personalizado
class POIDataset(Dataset):
    def __init__(self, metadata, images_dir, transform=None):
        self.metadata = metadata
        self.images_dir = images_dir
        self.transform = transform

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        row = self.metadata.iloc[idx]
        metadata_features = row[['locationLat', 'locationLon', 'tier', 'xps']].values.astype(np.float32)
        label = row['engagement']
        img_path = os.path.join(self.images_dir, row['main_image_path'])
        try:
            image = Image.open(img_path).convert('RGB')
        except FileNotFoundError:
            image = Image.new('RGB', (224, 224), color=(0, 0, 0))
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(metadata_features), torch.tensor(label, dtype=torch.float32)


In [None]:
# Balanceo de clases
class_counts = train_data['engagement'].value_counts()
weights = 1. / class_counts
samples_weights = train_data['engagement'].map(weights).values
sampler = WeightedRandomSampler(weights=torch.tensor(samples_weights, dtype=torch.float), num_samples=len(samples_weights), replacement=True)

# Crear datasets y dataloaders
train_dataset = POIDataset(train_data, images_dir, transform=train_transform)
val_dataset = POIDataset(val_data, images_dir, transform=val_test_transform)
test_dataset = POIDataset(test_data, images_dir, transform=val_test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Modelo combinado con ResNet18
class CombinedModel(nn.Module):
    def __init__(self, dropout=0.3):
        super(CombinedModel, self).__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()
        self.metadata_fc = nn.Sequential(
            nn.Linear(4, 128),
            nn.ReLU(),
            nn.Dropout(dropout)
        )
        self.fc = nn.Sequential(
            nn.Linear(512 + 128, 256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, x_img, x_metadata):
        resnet_out = self.resnet(x_img)
        metadata_out = self.metadata_fc(x_metadata)
        combined = torch.cat((resnet_out, metadata_out), dim=1)
        return self.fc(combined)
    
    # Early Stopping
class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta

    def __call__(self, val_loss, model, optimizer):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model, optimizer)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model, optimizer)
            self.counter = 0

    def save_checkpoint(self, val_loss, model, optimizer):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict()
        }, 'checkpoint.pt')
        self.val_loss_min = val_loss

In [None]:
# Entrenamiento
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, early_stopping):
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for images, metadata, labels in train_loader:
            images, metadata, labels = images.to(device), metadata.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, metadata).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_loss /= len(train_loader)

        # Validación
        model.eval()
        val_loss, all_preds, all_labels = 0, [], []
        with torch.no_grad():
            for images, metadata, labels in val_loader:
                images, metadata, labels = images.to(device), metadata.to(device), labels.to(device)
                outputs = model(images, metadata).squeeze()
                val_loss += criterion(outputs, labels).item()
                all_preds.extend((outputs > 0.5).cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        val_loss /= len(val_loader)
        f1 = f1_score(all_labels, all_preds)
        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, F1 Score: {f1:.4f}")

        # Early Stopping
        early_stopping(val_loss, model, optimizer)
        if early_stopping.early_stop:
            print("Early stopping triggered")
            break


In [None]:
# Optimización con Optuna
def objective(trial):
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
    dropout = trial.suggest_uniform('dropout', 0.1, 0.5)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])

    model = CombinedModel(dropout=dropout).to(device)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    early_stopping = EarlyStopping(patience=5, verbose=False)
    train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, early_stopping=early_stopping)

    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for images, metadata, labels in val_loader:
            images, metadata, labels = images.to(device), metadata.to(device), labels.to(device)
            outputs = model(images, metadata).squeeze()
            all_preds.extend((outputs > 0.5).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    return f1_score(all_labels, all_preds)

# Ejecutar Optuna
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=10)
print("Best Parameters:", study.best_params)