In [1]:
# Improve by ChatGPT & GitHub Copilot
# Running the requierements.ipynb
%run /bettik/PROJECTS/pr-data-ocean/riverama/Notebooks/OSSE_generator/requierements.ipynb

In [8]:
# Define the preprocessor function
def preprocess(ds):
    # Extract date from 'dac' variable's attributes and convert to a pandas datetime object
    date_str = ds.dac.attrs['date']  # Adjust this if necessary to match your actual date attribute
    date = pd.to_datetime(date_str, format='%Y-%m-%d %H:%M:%S.%f UTC').replace(tzinfo=None)  # Parse datetime and remove timezone to avoid conflicts

    # Align with the 'time_counter' format from SSH
    # Assigning the parsed date as a new coordinate, here renamed to 'time_counter' to match your requirement
    ds = ds.expand_dims('time_counter').assign_coords(time_counter=[date])

    return ds

# Path to the files - adjust the wildcard as necessary to match your filenames
path = "/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_marco/dac_dif_*.nc"

# Use xarray to open and concatenate the dataset along the new 'time_counter' dimension
# This uses the preprocess function defined above for each file
dac = xr.open_mfdataset(path, preprocess=preprocess, combine='nested', concat_dim='time_counter')

# At this point, 'dac' should be a single dataset with all files concatenated along 'time_counter'
# and the time information properly aligned with your requirements.

In [10]:
# Save the dataset to a new NetCDF file
dac.to_netcdf("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac.nc")

## Crop to SSH domain

In [None]:
dac = xr.open_mfdataset("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac.nc")

In [16]:
# Define the boundary coordinates for cropping
lon_min, lon_max = 159.23333740234375, 172.34999084472656
lat_min, lat_max = -28.803945541381836, -15.699763298034668

# Find the dataset's grid resolution for longitude and latitude if not already known
lon_res = np.abs(np.diff(dac['longitude'].values)).mean()
lat_res = np.abs(np.diff(dac['latitude'].values)).mean()

# Adjust the min/max coordinates to ensure inclusion of the nearest points outside the specified boundaries
# This example assumes the dataset's coordinates are increasing
lon_min_adj = lon_min - (lon_min % lon_res)
lon_max_adj = lon_max + (lon_res - (lon_max % lon_res)) % lon_res
lat_min_adj = lat_min - (lat_min % lat_res)
lat_max_adj = lat_max + (lat_res - (lat_max % lat_res)) % lat_res

# Now, crop the dataset using the adjusted coordinates
dac_cropped = dac.sel(
    longitude=slice(lon_min_adj, lon_max_adj),
    latitude=slice(lat_min_adj, lat_max_adj)
)


In [17]:
# Save the dataset to a new NetCDF file
dac_cropped.to_netcdf("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac_cropped.nc")

## Temporal interpolation

In [None]:
dac_cropped = xr.open_mfdataset("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac_cropped.nc")

In [18]:
# To crop the dataset to the specified time range, we can use the .sel() method with the 'time' coordinate
new_time = pd.date_range(start='2014-01-01 00:30', end='2014-12-31 23:30', freq='1H')
new_time_np = new_time.values.astype('datetime64[ns]')
# The 'method' argument specifies the interpolation method to use
dac_cropped_1h = dac_cropped.interp(time_counter=new_time, method='cubic')

In [20]:
# Save the dataset to a new NetCDF file
dac_cropped_1h.to_netcdf("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac_cropped_1h.nc")

## Spatial interpolation

### Step 1: Setting Up Dask Distributed Cluster

In [2]:
from dask.distributed import Client, LocalCluster
from dask import delayed, compute

# Configuration based on your resource allocation
n_nodes = 4
cores_per_node = 16
total_memory = 600  # GB
memory_per_node = total_memory / n_nodes  # Memory allocated per node

# Setting up the LocalCluster
# Note: Adjust memory_limit based on the actual usage pattern you observe for your tasks
cluster = LocalCluster(n_workers=n_nodes, 
                       threads_per_worker=cores_per_node, 
                       memory_limit=f'{memory_per_node}GB')

client = Client(cluster)

# Display the Dask dashboard link
print(f'Dask Dashboard: {client.dashboard_link}')

Dask Dashboard: http://127.0.0.1:8787/status


### Step 2: Loading Data with Dask

In [3]:
dac_cropped_1h = xr.open_mfdataset("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac_cropped_1h.nc", chunks={'time_counter': 100, 'latitude': 25, 'longitude': 25}).load()

In [4]:
SSH = xr.open_dataset("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/CALEDO60/una_hora_enero.nc", chunks={'y': 50, 'x': 50}).load()

### Step 3: Define the Interpolation Function

In [5]:
# Function to interpolate data from dac_cropped_1h to the SSH grid
def interpolate_to_ssh_grid(dac_data, dac_lat, dac_lon, ssh_nav_lat, ssh_nav_lon):
    # Flatten the SSH grid for the interpolation function
    target_points = np.vstack((ssh_nav_lon.flatten(), ssh_nav_lat.flatten())).T
    # Create source grid
    lon_grid, lat_grid = np.meshgrid(dac_lon, dac_lat)
    source_points = np.vstack((lon_grid.flatten(), lat_grid.flatten())).T
    values = dac_data.flatten()
    # Interpolate
    interpolated_values_flat = griddata(source_points, values, target_points, method='linear')
    return interpolated_values_flat.reshape(ssh_nav_lat.shape)

### Step 4: Parallel Interpolation Across Time Steps

In [6]:
# Prepare the datasets
dac_lat = dac_cropped_1h.latitude.values
dac_lon = dac_cropped_1h.longitude.values
ssh_nav_lat = SSH.nav_lat.values
ssh_nav_lon = SSH.nav_lon.values

In [7]:
from scipy.interpolate import griddata
# Delayed interpolation for each time step
results = []
for t in range(len(dac_cropped_1h.time_counter)):
    dac_data = dac_cropped_1h.dac.isel(time_counter=t).values
    res = delayed(interpolate_to_ssh_grid)(dac_data, dac_lat, dac_lon, ssh_nav_lat, ssh_nav_lon)
    results.append(res)

# Compute all interpolations in parallel
interpolated_values = compute(*results)

This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2024-03-22 11:23:46,755 - distributed.scheduler - ERROR - Couldn't gather keys: {'interpolate_to_ssh_grid-aa61eccb-a5ed-448c-a56a-f483eb85242f': 'queued', 'interpolate_to_ssh_grid-8aad92a6-0ef4-45df-9ab8-13fdabc536a1': 'queued', 'interpolate_to_ssh_grid-7e3efadd-1c3e-4ea8-bc83-1be785fd5bb2': 'queued', 'interpolate_to_ssh_grid-01569f67-c439-46a7-93b1-b4e4c70bb952': 'queued', 'interpolate_to_ssh_grid-6f998943-ef6c-4c86-b7b6-85c0673781bc': 'queued', 'interpolate_to_ssh_grid-333bc81e-745c-4c75-a8da-bbea667e911b': 'queued', 'interpolate_to_ssh_grid-93e619a4-340b-4826-9127-a71c81ff40f8': 'queued', 'interpolate_to_ssh_grid-6197ddf6-05ac-4ea8-baff-36fc5e73e7e4': 'queued', 'interpolate_to_ssh_grid-9b99907a-5f0f-46a0-a3c6-77b25fbe0384': 'queued', 'interpolate_to_ssh_grid-a0f795dc-c139-41ca-aa19-4abd94023c77': 'queued', 'interpolate_to_ssh_grid-0c4cd878-1a84-4aed-96c3-0c2088db8d9c': 'queued', 'interpolate_to_s

### Step 5: Reassemble the Interpolated Data into an xarray Dataset

In [None]:
# Assuming you want to combine the interpolated results into a new DataArray
# Note: This example might require adjustments for coordinates and dimensions
interpolated_data = xr.concat([xr.DataArray(interpolated_values[i], dims=['y', 'x']) 
                               for i in range(len(interpolated_values))], 
                              dim=dac_cropped_1h.time_counter)

In [None]:
# Save the interpolated dataset
interpolated_data.to_netcdf("/bettik/PROJECTS/pr-data-ocean/riverama/Datos/DAC_limpio/dac_cropped_1h_SSHgrid_2014.nc")