In [None]:
import os
from pathlib import Path
from functools import partial
from datetime import datetime
import logging

import xarray as xr
import dask
from dask.distributed import Client, LocalCluster
import dask.config
import pandas as pd
import numpy as np

In [None]:
DIR_DATA = Path(os.path.dirname(os.path.abspath(''))).resolve() / "data"
DIR_SOURCE = DIR_DATA / "raw"
DIR_OUTPUT = DIR_DATA / "processed"
DIR_OUTPUT.mkdir(parents=True, exist_ok=True)

In [37]:
try:
    print(f"Raw directory exists: {DIR_SOURCE.is_dir()}")
    print(f"Files in raw data directory: {sum([len(files) for _, _, files in os.walk(DIR_SOURCE)])}")
    print(f"Raw data size: {(sum(
        os.path.getsize(
            os.path.join(dirpath,filename)
            ) 
            for dirpath, dirnames, filenames in os.walk(DIR_SOURCE) 
            for filename in filenames 
        )
    ) / (1000 ** 3):.2f} GB")
except Exception as e:
    print(f"Raw data directory: error: {e}")

Raw directory exists: True
Files in raw data directory: 40906
Raw data size: 511.96 GB


In [None]:
DATASETS_VARS = {
    "ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4" : ['MXLDEPTH'],
    "ECCO_L4_SSH_05DEG_DAILY_V4R4" : ['SSH', 'SSHIBC'],
    "OSCAR_L4_OC_FINAL_V2.0" : ['u', 'v'],
    "OSTIA-UKMO-L4-GLOB-REP-v2.0" : ['analysed_sst'],
    "ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4" : ['EXFempmr', 'SFLUX'],
    "ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4" : ['PHIHYD', 'RHOAnoma'],
    "AQUA_MODIS_L3M_DAILY_4KM_CHLCONC": ['chlor_a'],
    "AQUA_MODIS_L3M_DAILY_4KM_PIC": ['pic'],
    "AQUA_MODIS_L3M_DAILY_4KM_POC": ['poc'],
}

DATASETS_SHORTHAND = {
    "ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4" : "MIXED_LAYER_DEPTH",
    "ECCO_L4_SSH_05DEG_DAILY_V4R4" : "SSH_DAILY",
    "OSCAR_L4_OC_FINAL_V2.0" : "CURRENTS",
    "OSTIA-UKMO-L4-GLOB-REP-v2.0" : "SST",
    "ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4" : "FRESHWATER_FLUX",
    "ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4" : "DENSITY",
    "AQUA_MODIS_L3M_DAILY_4KM_CHLCONC": "CHLOROPHYLL",
    "AQUA_MODIS_L3M_DAILY_4KM_PIC": "PIC",
    "AQUA_MODIS_L3M_DAILY_4KM_POC": "POC"
}

In [None]:
def _preprocess(x, lon_bnds, lat_bnds, dataset):

    x = x[DATASETS_VARS[dataset]]
    try:
        x = x.rename({
            "Time": "time",
            "Longitude": "lon",
            "Latitude": "lat"
        })
    except Exception as e:
        pass
    try:
        x = x.rename({
            "longitude": "lon",
            "latitude": "lat"
        })
    except Exception as e:
        pass

    x = x.dropna(dim='time', how='all', subset=DATASETS_VARS[dataset])

    variables = dict().fromkeys(DATASETS_VARS[dataset], np.float32)
    x = x.astype(variables)

    if pd.Series(x['lat'].values).is_monotonic_increasing:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[0], lat_bnds[1]))
    else:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[1], lat_bnds[0]))
    

def _preprocess_OSCAR(x, lon_bnds, lat_bnds):

    x = x[DATASETS_VARS["OSCAR_L4_OC_FINAL_V2.0"]]
    x = x.swap_dims({"longitude": "lon", "latitude": "lat"})
    x = x.dropna(dim='time', how='all', subset=DATASETS_VARS["OSCAR_L4_OC_FINAL_V2.0"])

    x['lon'] = (x['lon'] + 180) % 360 - 180

    variables = dict().fromkeys(DATASETS_VARS["OSCAR_L4_OC_FINAL_V2.0"], np.float32)
    x = x.astype(variables)
    
    if pd.Series(x['lat'].values).is_monotonic_increasing:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[0], lat_bnds[1]))
    else:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[1], lat_bnds[0]))

def _preprocess_DENSITY(x, lon_bnds, lat_bnds):

    x = x[DATASETS_VARS["ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4"]]
    try:
        x = x.rename({
            "longitude": "lon",
            "latitude": "lat"
        })
    except Exception as e:
        pass

    x = x.dropna(dim='time', how='all', subset=DATASETS_VARS["ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4"])

    variables = dict().fromkeys(DATASETS_VARS["ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4"], np.float32)
    x = x.astype(variables)
    x = x.sel(Z=slice(-5, -100))
    x = x.mean(dim='Z')

    if pd.Series(x['lat'].values).is_monotonic_increasing:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[0], lat_bnds[1]))
    else:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[1], lat_bnds[0]))
    
def _preprocess_MODIS(x, lon_bnds, lat_bnds, dataset):

    filename = x.attrs["product_name"]
    time = filename[11:19]
    time = datetime.strptime(time, "%Y%m%d")
    time = pd.to_datetime(time)

    x = x.assign_coords(time=time)
    x = x.expand_dims(dim='time')

    x = x[DATASETS_VARS[dataset]]

    x = x.dropna(dim='time', how='all', subset=DATASETS_VARS[dataset])

    variables = dict().fromkeys(DATASETS_VARS[dataset], np.float32)
    x = x.astype(variables)
    
    if pd.Series(x['lat'].values).is_monotonic_increasing:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[0], lat_bnds[1]))
    else:
        return x.sel(lon=slice(*lon_bnds), lat=slice(lat_bnds[1], lat_bnds[0]))

In [None]:
dask.config.set({'array.chunk-size': '1024MiB'})

with LocalCluster(n_workers=5, threads_per_worker=2 ,silence_logs=logging.ERROR, memory_limit="5GiB") as cluster:
    with Client(cluster) as client:

        print(client)

        for dataset, vars in DATASETS_VARS.items():

            new_dataset_path = DIR_OUTPUT / dataset
            new_dataset_path.mkdir(parents=True, exist_ok=True)

            if len(os.listdir(DIR_OUTPUT / dataset)) == 1:
                continue

            print(f"Dataset: {dataset}")
            
            files = os.path.join(DIR_SOURCE / dataset, "*.nc")

            lon_bnds, lat_bnds = (-180, -115), (15, 65)

            if dataset == "OSCAR_L4_OC_FINAL_V2.0":
                partial_func = partial(_preprocess_OSCAR, lon_bnds=lon_bnds, lat_bnds=lat_bnds)
                ds = xr.open_mfdataset(
                    files, 
                    preprocess=partial_func, 
                    join='outer', 
                    combine_attrs='drop_conflicts',
                    # combine='nested',
                    # concat_dim='time',
                    parallel=True,
                    engine='h5netcdf',
                    chunks='auto'
                )
            elif dataset == "ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4":
                partial_func = partial(_preprocess_DENSITY, lon_bnds=lon_bnds, lat_bnds=lat_bnds)
                ds = xr.open_mfdataset(
                    files, 
                    preprocess=partial_func, 
                    join='outer', 
                    combine_attrs='drop_conflicts',
                    # combine='nested',
                    # concat_dim='time',
                    parallel=True,
                    engine='h5netcdf',
                    chunks='auto'
                )
            elif dataset == "AQUA_MODIS_L3M_DAILY_4KM_CHLCONC" or dataset == "AQUA_MODIS_L3M_DAILY_4KM_POC" or dataset == "AQUA_MODIS_L3M_DAILY_4KM_PIC":
                partial_func = partial(_preprocess_MODIS, lon_bnds=lon_bnds, lat_bnds=lat_bnds, dataset=dataset)
                ds = xr.open_mfdataset(
                    files, 
                    preprocess=partial_func, 
                    join='outer', 
                    combine_attrs='drop_conflicts',
                    # combine='nested',
                    # concat_dim='time',
                    parallel=True,
                    engine='h5netcdf',
                    chunks='auto'
                )
            else:
                partial_func = partial(_preprocess, lon_bnds=lon_bnds, lat_bnds=lat_bnds, dataset=dataset)
                ds = xr.open_mfdataset(
                    files, 
                    preprocess=partial_func, 
                    join='outer', 
                    combine_attrs='drop_conflicts',
                    # combine='nested',
                    # concat_dim='time',
                    parallel=True,
                    engine='h5netcdf',
                    chunks='auto'
                )

            print(ds)
            # ds.to_netcdf(new_dataset_path / (DATASETS_SHORTHAND[dataset] + ".nc"), engine='h5netcdf')
            ds.to_zarr(new_dataset_path / (DATASETS_SHORTHAND[dataset] + ".zarr"), mode='w')
            ds.close()

<Client: 'tcp://127.0.0.1:39055' processes=5 threads=10, memory=25.00 GiB>
Dataset: AQUA_MODIS_L3M_DAILY_4KM_CHLCONC
<xarray.Dataset> Size: 30GB
Dimensions:  (time: 4069, lat: 1200, lon: 1560)
Coordinates:
  * lat      (lat) float32 5kB 64.98 64.94 64.9 64.85 ... 15.15 15.1 15.06 15.02
  * lon      (lon) float32 6kB -180.0 -179.9 -179.9 ... -115.1 -115.1 -115.0
  * time     (time) datetime64[ns] 33kB 2002-07-05 2002-07-06 ... 2013-08-31
Data variables:
    chlor_a  (time, lat, lon) float32 30GB dask.array<chunksize=(1, 1200, 1560), meta=np.ndarray>
Attributes: (12/49)
    instrument:                       MODIS
    title:                            MODISA Level-3 Standard Mapped Image
    project:                          Ocean Biology Processing Group (NASA/GS...
    platform:                         Aqua
    source:                           satellite observations from MODIS-Aqua
    processing_version:               R2022.0
    ...                               ...
    creator_url: 

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Dataset: AQUA_MODIS_L3M_DAILY_4KM_PIC
<xarray.Dataset> Size: 30GB
Dimensions:  (time: 4070, lat: 1200, lon: 1560)
Coordinates:
  * lat      (lat) float32 5kB 64.98 64.94 64.9 64.85 ... 15.15 15.1 15.06 15.02
  * lon      (lon) float32 6kB -180.0 -179.9 -179.9 ... -115.1 -115.1 -115.0
  * time     (time) datetime64[ns] 33kB 2002-07-04 2002-07-05 ... 2013-08-31
Data variables:
    pic      (time, lat, lon) float32 30GB dask.array<chunksize=(1, 1200, 1560), meta=np.ndarray>
Attributes: (12/49)
    instrument:                       MODIS
    title:                            MODISA Level-3 Standard Mapped Image
    project:                          Ocean Biology Processing Group (NASA/GS...
    platform:                         Aqua
    source:                           satellite observations from MODIS-Aqua
    processing_version:               R2022.0
    ...                               ...
    creator_url:                      https://oceandata.sci.gsfc.nasa.gov
    publisher_url:    

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Dataset: AQUA_MODIS_L3M_DAILY_4KM_POC
<xarray.Dataset> Size: 30GB
Dimensions:  (time: 4070, lat: 1200, lon: 1560)
Coordinates:
  * lat      (lat) float32 5kB 64.98 64.94 64.9 64.85 ... 15.15 15.1 15.06 15.02
  * lon      (lon) float32 6kB -180.0 -179.9 -179.9 ... -115.1 -115.1 -115.0
  * time     (time) datetime64[ns] 33kB 2002-07-04 2002-07-05 ... 2013-08-31
Data variables:
    poc      (time, lat, lon) float32 30GB dask.array<chunksize=(1, 1200, 1560), meta=np.ndarray>
Attributes: (12/49)
    instrument:                       MODIS
    title:                            MODISA Level-3 Standard Mapped Image
    project:                          Ocean Biology Processing Group (NASA/GS...
    platform:                         Aqua
    source:                           satellite observations from MODIS-Aqua
    processing_version:               R2022.0
    ...                               ...
    creator_url:                      https://oceandata.sci.gsfc.nasa.gov
    publisher_url:    

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
