# Pruebas del Batch Generator con los datos oceanográficos
Este notebook tiene como objetivo mostrar el proceso de configuración y prueba de un Batch Generator para el procesamiento de datos oceanográficos a partir de un conjunto de datos de Copernicus. Se realizan varios pasos como la preparación y normalización de datos, la creación de un generador de lotes (batches), y la evaluación del desempeño del generador mediante diferentes pruebas. Además, se realiza un análisis detallado de los recursos de memoria utilizados en el proceso.

## 1.Importación de Librerías y Carga de Datos

En este primer paso, se importan las librerías necesarias para el procesamiento de los datos, así como el dataset oceanográfico que será utilizado. Esto incluye la librería xarray para manejo de datos multidimensionales, torch para procesamiento y cálculo, y matplotlib para visualización. Luego se carga el dataset desde un archivo local.

In [None]:
import random
from datetime import datetime

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.optim as optim
from torch.amp import autocast, GradScaler
from torch.utils.data import DataLoader, Dataset
import xarray as xr
from tqdm import tqdm
import gc

from aurora import Aurora, Batch, Metadata, normalisation, rollout
from typing import Tuple,List

dataset = xr.open_dataset("D://Aaron//cmems_mod_glo_phy_my_0.083deg_P1D-m_6years_thetao_v3.nc")
lsm=xr.open_dataset("D://Aaron//datos_mascara.nc") 




In [26]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 2.Preprocesamiento de Datos

En este paso se realizan varias operaciones para preparar el dataset. Entre ellas:

Filtrado de Variables y Profundidad: Se selecciona la variable thetao y se limita la cantidad de niveles de profundidad a los primeros 10 para reducir el volumen de datos.

Interpolación de Coordenadas y Aplicación de Máscara: Se ajustan las longitudes y latitudes para que coincidan con los datos del dataset y se interpolan para tener una resolución uniforme.

In [28]:

# Cargar el dataset solo con la variable que necesitas
variables = ['thetao']  # Selecciona solo 'thetao'


# Filtrar por la profundidad (limitando el número de niveles de profundidad)
dataset = dataset.isel(depth=slice(0, 10))  # Limitar a los primeros 10 niveles de profundidad
ocean_levels = dataset['depth'].values

# Ajustar las longitudes para que coincidan en rango (de -180 a 180) y latitudes para interpolación
lsm_copy = lsm.copy()
lsm_copy = lsm_copy.assign_coords(longitude=(((lsm_copy.longitude + 180) % 360) - 180))

# Interpolar la variable lsm para que coincida con la resolución del dataset
lsm_interp = lsm_copy.interp(latitude=dataset.latitude, longitude=dataset.longitude, method="nearest")

# Asignar la variable lsm al dataset sin añadir coordenadas innecesarias
lsm_interp_clean = lsm_interp.fillna(0)  # Reemplazar los NaNs por ceros
dataset['lsm'] = lsm_interp_clean['lsm']

# Eliminar las coordenadas innecesarias si se añadieron automáticamente
coordinates_to_drop = ['number', 'step', 'surface', 'valid_time']
for coord in coordinates_to_drop:
    if coord in dataset.coords:
        dataset = dataset.drop_vars(coord)

# Verificar y ajustar las latitudes para asegurar que están en el orden correcto y dentro del rango adecuado
def check_latitudes(dataset: xr.Dataset) -> xr.Dataset:
    latitude = dataset['latitude'].values
    if not (np.all(latitude <= 90) and np.all(latitude >= -90)):
        raise ValueError("Algunos valores de latitud están fuera del rango [-90, 90]. Por favor, corrígelos.")
    if not np.all(np.diff(latitude) < 0):
        dataset = dataset.sortby('latitude', ascending=False)
    return dataset

# Ajustar las longitudes para que estén dentro del rango [0,360]
def check_longitudes(dataset: xr.Dataset) -> xr.Dataset:
    dataset = dataset.assign_coords(longitude=((dataset.longitude + 360) % 360))
    return dataset

# Aplicar funciones de verificación al dataset
dataset = check_latitudes(dataset)
dataset = check_longitudes(dataset)

# Convertir latitudes y longitudes a tensores de Torch para su posterior uso
latitude = torch.from_numpy(dataset['latitude'].values).float()
longitude = torch.from_numpy(dataset['longitude'].values).float()

# Revisar si hay valores NaN restantes después de la interpolación y eliminarlos si es necesario
dataset = dataset.dropna(dim="latitude", how="all").dropna(dim="longitude", how="all")

# Rellenar los valores NaN con la media de la variable
def fill_nan_with_mean(var: xr.DataArray) -> xr.DataArray:
    if var.isnull().any():
        return var.fillna(var.mean())
    else:
        return var

for var in variables:
    dataset[var] = fill_nan_with_mean(dataset[var])



## 3.Definición de Funciones de Carga de Datos

Se definen varias funciones que ayudan a cargar diferentes tipos de datos del dataset:

load_ocean_surface(): Para cargar datos de superficie del océano.

load_ocean_atmos(): Para cargar datos atmosféricos.

load_static_var(): Para cargar variables estáticas como la máscara de tierra.

In [30]:
# Definir funciones para cargar datos
def load_ocean_surface(v: str, sample_sets: list) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Carga los datos de variables de superficie para un conjunto de muestras.

    Args:
        v (str): Nombre de la variable.
        sample_sets (list): Lista de conjuntos de datos de muestra.

    Returns:
        Tuple[torch.Tensor, torch.Tensor]: Datos y targets concatenados de la variable de superficie.
    """
    data_list = []
    target_list = []
    for sample_set in sample_sets:
        sel_dict = {}
        if 'depth' in sample_set[v].dims:
            sel_dict['depth'] = 0  # Seleccionar nivel superficial
        data = sample_set[v].isel(**sel_dict).isel(time=slice(0, 2)).values  # (time, lat, lon)
        data_tensor = torch.from_numpy(data).float()  # (time, lat, lon)
        data_list.append(data_tensor)

        target = sample_set[v].isel(**sel_dict).isel(time=slice(2,None)).values  # (lat, lon)
        target_tensor = torch.from_numpy(target).float()  # (lat, lon)


        target_list.append(target_tensor)

    # Concatenar los datos a lo largo de la dimensión batch (nueva dimensión 0)
    data_batch = torch.stack(data_list, dim=0)    # (batch_size, time, lat, lon)
    target_batch = torch.stack(target_list, dim=0)  # (batch_size, 1, lat, lon)

    return data_batch, target_batch




def load_ocean_atmos(v: str, sample_sets: list) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Carga los datos de variables atmosféricas para un conjunto de muestras.

    Args:
        v (str): Nombre de la variable.
        sample_sets (list): Lista de conjuntos de datos de muestra.

    Returns:
        Tuple[torch.Tensor, torch.Tensor]: Datos y targets concatenados de la variable atmosférica.
    """
    data_list = []
    target_list = []
    for sample_set in sample_sets:
        sel_dict = {'depth': slice(0, 10)}  # Seleccionar los primeros 10 niveles de profundidad
        data = sample_set[v].isel(**sel_dict).isel(time=slice(0, 2)).values  # (time, depth, lat, lon)
        data_tensor = torch.from_numpy(data).float()  # (time, depth, lat, lon)
        data_list.append(data_tensor)

        target = sample_set[v].isel(**sel_dict).isel(time=slice(2,None)).values  # (depth, lat, lon)
        target_tensor = torch.from_numpy(target).float()  # (depth, lat, lon)
        target_list.append(target_tensor)

    # Concatenar los datos a lo largo de la dimensión batch
    data_batch = torch.stack(data_list, dim=0)  # (batch_size, time, depth, lat, lon)
    target_batch = torch.stack(target_list, dim=0)  # (batch_size, depth, lat, lon)

    return data_batch, target_batch



def load_static_var(v: str, sample_sets: list) -> torch.Tensor:
    """
    Carga una variable estática del dataset.

    Args:
        v (str): Nombre de la variable.
        sample_sets (list): Lista de conjuntos de datos de muestra.

    Returns:
        torch.Tensor: Tensor con los datos de la variable estática (lat, lon).
    """
    # Since static variables are the same across the batch, we can take from the first sample
    sample_set = sample_sets[0]
    data_var = sample_set[v]
    dims_to_drop = [dim for dim in data_var.dims if dim not in ('latitude', 'longitude')]
    data_var = data_var.isel({dim: 0 for dim in dims_to_drop})
    data = data_var.values  # Should be (lat, lon)
    data_tensor = torch.from_numpy(data).float()

    return data_tensor  # Shape: (lat, lon)

In [31]:

surf_vars: tuple[str, ...] = ('thetao',)
static_vars: tuple[str, ...] = ('lsm',)
atmos_vars: tuple[str, ...] = ('thetao',)


In [32]:
# Seleccionar solo los datos de verano (JJA) de cada año
dataset = dataset.where(dataset['time'].dt.month.isin([7, 8, 9]), drop=True)
# Agrupar por año y contar los veranos disponibles
years = np.unique(dataset['time'].dt.year.values)
num_years = len(years)

# Definir proporciones para entrenamiento, validación y prueba (en años completos de verano)
train_ratio, val_ratio, test_ratio = 0.6, 0.2, 0.2
train_years = int(train_ratio * num_years)
val_years = int(val_ratio * num_years)
test_years = num_years - train_years - val_years

# Dividir el conjunto de años en entrenamiento, validación y prueba
train_years_list = years[:train_years]
val_years_list = years[train_years:train_years + val_years]
test_years_list = years[train_years + val_years:]

# Crear los conjuntos de datos de entrenamiento, validación y prueba usando veranos completos
train_dataset = dataset.sel(time=dataset['time'].dt.year.isin(train_years_list))
val_dataset = dataset.sel(time=dataset['time'].dt.year.isin(val_years_list))
test_dataset = dataset.sel(time=dataset['time'].dt.year.isin(test_years_list))

##  4.Selección y Normalización de Datos de Verano

Se filtran los datos para seleccionar solo los veranos (meses de julio, agosto, septiembre) y se dividen en tres conjuntos: entrenamiento, validación y prueba.

Normalización: Se calcula la media y desviación estándar para normalizar los datos de entrenamiento y asegurar una mejor convergencia del modelo.

In [33]:
train_ocean_levels = train_dataset['depth'].values

# Normalización para thetao en los niveles oceánicos
for level in train_ocean_levels:
    level_str = f"{level}"
    var = "thetao"
    data = train_dataset[var].sel(depth=level).values
    mean = np.nanmean(data)
    std = np.nanstd(data)
    normalisation.locations[f"{var}_{level_str}"] = mean
    normalisation.scales[f"{var}_{level_str}"] = std

# Normalización para thetao en la superficie
surface_vars = ["thetao"]
for var in surface_vars:
    if 'depth' in train_dataset[var].dims:
        data = train_dataset[var].isel(depth=0).values
    else:
        data = train_dataset[var].values
    mean = np.nanmean(data)
    std = np.nanstd(data)
    normalisation.locations[var] = mean
    normalisation.scales[var] = std

print("Variable  thetao  actualizadas exitosamente con el conjunto de entrenamiento.")



Variable  thetao  actualizadas exitosamente con el conjunto de entrenamiento.


## 5.Implementación del BatchGenerator

In [34]:
class BatchGenerator:
    def __init__(self, dataset: xr.Dataset, sample_size: int, batch_size: int, shuffle: bool = True, padding: bool = True):
        """
        Inicializa el BatchGenerator.

        Args:
            dataset (xr.Dataset): El conjunto de datos.
            sample_size (int): Tamaño de cada ventana deslizante.
            batch_size (int): Tamaño de cada batch.
            shuffle (bool): Si se deben barajar las muestras.
            padding (bool): Si se debe aplicar padding al último batch.
        """
        self.dataset = dataset
        self.sample_size = sample_size
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.padding = padding
        self.samples = self.generate_sliding_windows()
        if self.shuffle:
            self.samples = self.shuffle_samples()

    def generate_sliding_windows(self):
        """
        Genera ventanas deslizantes sobre el eje temporal del dataset.

        Returns:
            list: Lista de muestras generadas mediante ventanas deslizantes.
        """
        window_size = self.sample_size
        windows = [slice(i, i + window_size) for i in range(0, len(self.dataset.time) - window_size + 1)]
        samples = [self.dataset.isel(time=w) for w in windows]
        return samples

    def shuffle_samples(self):
        """
        Baraja las muestras generadas.

        Returns:
            list: Lista de muestras barajadas.
        """
        samples_copy = self.samples.copy()
        np.random.shuffle(samples_copy)
        return samples_copy

    def load_ocean_batch(self, sample_sets):
        """
    Carga un batch de datos a partir de un conjunto de muestras.

    Args:
        sample_sets (list): Lista de conjuntos de datos de muestra.

    Returns:
        Tuple[Batch, Batch]: Batch de datos y batch de targets.
        """
        is_padding = any(sample.attrs.get('is_padding', False) for sample in sample_sets)

    # Llamar a las funciones de carga modificadas
        surf_data, surf_target = load_ocean_surface("thetao", sample_sets)
        atmos_data, atmos_target = load_ocean_atmos("thetao", sample_sets)
        static_data = load_static_var("lsm", sample_sets).to(device)

        surf_data = surf_data.to(device)
        surf_target = surf_target.to(device)
        atmos_data = atmos_data.to(device)
        atmos_target = atmos_target.to(device)

        times = [
            sample_set['time'].values[-1].astype('datetime64[s]').astype(datetime)
            for sample_set in sample_sets
        ]
    # Crear instancia de Batch para el batch completo
        batch = Batch(
            surf_vars={
                "thetao": surf_data,
            },
            static_vars={
                "lsm": static_data,
            },
            atmos_vars={
                "thetao": atmos_data,
            },
            metadata=Metadata(
                lat=latitude.to(device),
                lon=longitude.to(device),
                time=times,
                atmos_levels=ocean_levels,
            )
        )
        batch.metadata.is_padding = is_padding

        batch_target = Batch(
            surf_vars={
                "thetao": surf_target,
            },
            static_vars={
             "lsm": static_data,  # Asumimos que los static_vars son iguales para data y target
            },
            atmos_vars={
                "thetao": atmos_target,
            },
            metadata=Metadata(
                lat=latitude.to(device),
                lon=longitude.to(device),
                time=times,
                atmos_levels=ocean_levels,
            )
        )
        batch_target.metadata.is_padding = is_padding

        

        return batch, batch_target


    def __iter__(self):
        """
        Iterador que genera batches de datos.

        Yields:
            Tuple[list, list]: Batch de datos y batch de targets.
        """
        # Dividimos las muestras en batches
        for i in range(0, len(self.samples), self.batch_size):
            batch_samples = self.samples[i:i + self.batch_size]

            # Aplicamos padding si es necesario
            if len(batch_samples) < self.batch_size and self.padding:
                num_padding = self.batch_size - len(batch_samples)
                for _ in range(num_padding):
                    sample = self.samples[i % len(self.samples)]
                    sample = sample.copy()
                    sample.attrs['is_padding'] = True
                    batch_samples.append(sample)

            batch, batch_target = self.load_ocean_batch(batch_samples)
            yield batch, batch_target


## 6.Pruebas de Funcionamiento del Batch Generator

Para verificar que el BatchGenerator esté funcionando correctamente, se realizan varias pruebas que comprueban la estructura y tipos de los datos generados.

Prueba 1: Se verifica que el BatchGenerator sea un generador.

Prueba 2: Se comprueba que el primer elemento generado sea una tupla.

Prueba 3: Se verifica que el contenido de los batches sea una instancia de la clase Batch.

Prueba 4: Se revisa que la variable thetao dentro del batch sea un torch.Tensor.

Prueba 5 y 6: Se validan las dimensiones esperadas de las variables dentro del batch.

In [35]:
from collections.abc import Generator
from aurora.batch import Batch

sample_size = 3
batch_size = 8
batch_gen = BatchGenerator(
    train_dataset, 
    sample_size, 
    batch_size=batch_size, 
    shuffle=True, 
    padding=True
    )

In [36]:

# Check if batch_gen is a generator
print('Test 1') # ❌
print('------------------')
# It must be a generator
print(f'BatchGenerator it a generator?: {isinstance(iter(batch_gen), Generator)}')
# e.g. of a generator
dummy_gen = (x for x in range(10))
print(f'Dummy is a generator?: {isinstance(dummy_gen, Generator)}')
assert isinstance(iter(batch_gen), Generator), 'batch_gen must be a generator'

Test 1
------------------
BatchGenerator it a generator?: True
Dummy is a generator?: True


In [37]:

# Check if the first element of batch_gen
print('Test 2') # ✔️
print('------------------')
print(f'First element of batch_gen: {type(next(iter(batch_gen)))}')
print(f'length of first element of batch_gen: {len(next(iter(batch_gen)))}')
# It must be a tuple
print(f'Is it a tuple?: {isinstance(next(iter(batch_gen)), tuple)}')
assert isinstance(next(iter(batch_gen)), tuple), 'The first element of batch_gen must be a tuple'


Test 2
------------------
First element of batch_gen: <class 'tuple'>
length of first element of batch_gen: 2
Is it a tuple?: True


In [38]:
# Check the elements inside the first element of batch_gen
print('Test 3') # ❌
print('------------------')
print(f'First element of the fist element: {type(next(iter(batch_gen))[0])}')
# It must be a Batch class
print(f'Is it a Batch class?: {isinstance(next(iter(batch_gen))[0], Batch)}')
assert isinstance(next(iter(batch_gen))[0], Batch), 'The elements inside the first element of batch_gen must be of type Batch'


Test 3
------------------
First element of the fist element: <class 'aurora.batch.Batch'>
Is it a Batch class?: True


In [39]:

# Check the type of the first element of the first element of batch_gen
print('Test 4') # ❌
print('------------------')
print(f'Type of the first element: {type(next(iter(batch_gen))[0].surf_vars["thetao"])}')
# It must be a torch.Tensor
print(f'Is it a torch.Tensor?: {isinstance(next(iter(batch_gen))[0].surf_vars["thetao"], torch.Tensor)}')
assert isinstance(next(iter(batch_gen))[0].surf_vars["thetao"], torch.Tensor), 'The elements inside the first element of batch_gen must be of type torch.Tensor'



Test 4
------------------
Type of the first element: <class 'torch.Tensor'>
Is it a torch.Tensor?: True


In [40]:


# Check the shape of the first element of the first element of batch_gen
print('Test 5') # ❌
print('------------------')
print(f'Shape of the first element: {next(iter(batch_gen))[0].surf_vars["thetao"].shape}')
# It must be (b, t, h, w)
# Expected dims: (8, 2, 180, 180)
b = batch_size
t = 2
h = train_dataset.latitude.size
w = train_dataset.longitude.size
print(
    f"""
    Is it the correct shape?: 
    {next(iter(batch_gen))[0].surf_vars["thetao"].shape == (b, t, h, w)}
    """
    )
assert next(iter(batch_gen))[0].surf_vars["thetao"].shape == (b, t, h, w), 'The shape of the first element of the first element of batch_gen is incorrect'


Test 5
------------------
Shape of the first element: torch.Size([8, 2, 180, 180])

    Is it the correct shape?: 
    True
    


In [41]:

# Check the shape of the second element of the first element of batch_gen
print('Test 6') # ❌
print('------------------')
print(f'Shape of the second element: {next(iter(batch_gen))[1].surf_vars["thetao"].shape}')
# It must be (b, t, h, w)
# Expected dims: (8, 1, 180, 180)
b = batch_size
t = 1
h = train_dataset.latitude.size
w = train_dataset.longitude.size
print(
    f"""
    Is it the correct shape?: 
    {next(iter(batch_gen))[1].surf_vars["thetao"].shape == (b, t, h, w)}
    """
    )
assert next(iter(batch_gen))[1].surf_vars["thetao"].shape == (b, t, h, w), 'The shape of the second element of the first element of batch_gen is incorrect'

Test 6
------------------
Shape of the second element: torch.Size([8, 1, 180, 180])

    Is it the correct shape?: 
    True
    


## 7.Prueba de Uso de Memoria

Finalmente, se realiza una prueba para evaluar el uso de memoria del BatchGenerator. Se mide la memoria utilizada por la CPU y GPU antes y después de cargar un batch de datos.

Comparación de Memoria: Se comparan los cambios en el uso de memoria para asegurarse de que no haya fugas significativas.


In [None]:
import os
import psutil
import time
import torch

process = psutil.Process(os.getpid())
# 1 Byte [B] is a group of 8 bits [b]
# byte on base 1024
# bit on base 1000
# Get CPU memory usage before loading the variable
cpu_memory_before = process.memory_info().rss / (1024 ** 2)
# Get GPU memory usage before loading the variable
gpu_memory_before = torch.cuda.memory_allocated(device) / (1024 ** 2)

start = time.time()
a_batch_approx = next(iter(batch_gen))[0].surf_vars["thetao"]
elapsed_time = time.time() - start

# Get CPU memory usage after loading the variable
cpu_memory_after = process.memory_info().rss / (1024 ** 2)
# Get GPU memory usage after loading the variable
gpu_memory_after = torch.cuda.memory_allocated(device) / (1024 ** 2)
# Compute change in memory usage in MB
change_in_cpu_memory = cpu_memory_after - cpu_memory_before
change_in_gpu_memory = gpu_memory_after - gpu_memory_before

# Testing memory usage
print('Memory usage test') # ❌
print('------------------')
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print(f"CPU memory usage before: {cpu_memory_before:.2f} MB")
print(f"CPU memory usage after: {cpu_memory_after:.2f} MB")
print(f"Change in CPU memory usage: {change_in_cpu_memory:.2f} MB")
print(f"GPU memory usage before: {gpu_memory_before:.2f} MB")
print(f"GPU memory usage after: {gpu_memory_after:.2f} MB")
print(f"Change in GPU memory usage: {change_in_gpu_memory:.2f} MB")
# Define tolerance for maximum memory increase
# Expected dims: (8, 2, 180, 180)
b = batch_size
t = 2
h = train_dataset.latitude.size
w = train_dataset.longitude.size
expected_change = torch.randn(b, 2, h, w).numpy().nbytes / (1024 ** 2)  # MB
print(f"Expected change in memory usage: {expected_change:.2f} MB")

# Check if the change in memory usage is within the tolerance
if change_in_cpu_memory != 0:
    # It must be False
    print(
        f"""
        Is the change in CPU memory usage higher than expected?: 
        {change_in_cpu_memory > expected_change}
        """
        )
    assert change_in_cpu_memory <= expected_change, 'The change in CPU memory usage is higher than expected'
    
    # It must be 0
    print(f"How much it differ?: {abs(change_in_cpu_memory - expected_change):.2f} MB")
    assert abs(change_in_cpu_memory - expected_change) <= 0, 'The change in CPU memory usage compared to the expected change must be 0'

if change_in_gpu_memory != 0:
    # It must be False
    print(
        f"""
        Is the change in GPU memory usage higher than expected?: 
        {change_in_gpu_memory > expected_change}
        """
        )
    assert change_in_gpu_memory <= expected_change, 'The change in GPU memory usage is higher than expected'
    
    # It must be 0
    print(f"How much it differ?: {abs(change_in_gpu_memory - expected_change):.2f} MB")
    assert abs(change_in_gpu_memory - expected_change) <= 0, 'The change in GPU memory usage compared to the expected change must be 0'

# ⚠️ Delete the variable to rerun the test ⚠️
del a_batch_approx




Memory usage test
------------------
Elapsed time: 0.05 seconds
CPU memory usage before: 9822.89 MB
CPU memory usage after: 9822.89 MB
Change in CPU memory usage: 0.00 MB
GPU memory usage before: 1.98 MB
GPU memory usage after: 1.98 MB
Change in GPU memory usage: 0.00 MB
Expected change in memory usage: 1.98 MB
