In [1]:
import os
from pathlib import Path
from datetime import datetime 
from dask import array as da
import fsspec
import gcsfs
import json
import numpy as np
import xarray as xr
import utils_EOATS as util

                
def metadata_extraction(DATA_ROOT):
        
    lats = []
    lons = []
    ts = []   
    
    start = datetime.now()
    paths=[]
    for path in util.iter_csv_paths(DATA_ROOT):
        
        lat = None
        lon = None
        date = None
    
        with open(path, "rb", buffering=1024 * 1024) as f:
            for line in f:
                if not line:
                    continue
    
                # --- read header metadata --------------------------------------
                if line.startswith(b"#profile_latitude "):
                    lat = float(line[len(b"#profile_latitude "):].strip())
                    continue
                elif line.startswith(b"#profile_longitude "):
                    lon = float(line[len(b"#profile_longitude "):].strip())
                    continue
                elif line.startswith(b"#profile_date "):
                    date = line[len(b"#profile_date "):].strip().decode("ascii")
                    continue
                if (lat is not None) and (lon is not None) and (date is not None):
                    break

        if (lat is None) or (lon is None) or (date is None):
            raise ValueError(f"Missing metadata in file: {path}")
            
        lats.append(lat)
        lons.append(lon)
        ts.append(date)
        paths.append('/'.join(path.split('/')[-3:]))
        
        if len(paths) % 265000 == 0:
            print(datetime.now() - start)
    
    latitudes = np.array(lats, dtype=np.float32)
    longitudes = np.array(lons, dtype=np.float32)
    times = np.array(ts, dtype="datetime64[ns]")
    OUT_DIR = Path("/home/jovyan/EasyOneArgoTSLite/derived")
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    np.save(OUT_DIR / "latitudes.npy", latitudes)
    np.save(OUT_DIR / "longitudes.npy", longitudes)
    np.save(OUT_DIR / "times.npy", times)
    try:
        np.save(OUT_DIR / "paths.npy", np.array(paths, dtype="U"))
    except Exception as e:
        print("Returning list 'paths'. Save failed:", e)
        return paths
    
    print("Saved to", OUT_DIR)
    return None


In [2]:
def initialize_EasyOneArgoTSLitedataset_RavioliChunks(outfile, lat_, lon_, time_, paths_) -> fsspec.mapping.FSMap:
    
    """
    Save Pressure/T/S/Pressure_error/T_error/S_error for each location's pressurelevels
    """
    N_profiles = lat_.size
    if not lon_.size == time_.size == N_profiles:
        raise ValueError("Number of profiles should be equal to number of coordinates")
    print("N_profiles:", N_profiles)
    
    pressure_ = np.array([
        2.00e+00, 5.00e+00, 1.00e+01, 1.50e+01, 2.00e+01, 2.50e+01, 3.00e+01, 3.50e+01,
        4.00e+01, 4.50e+01, 5.00e+01, 5.50e+01, 6.00e+01, 6.50e+01, 7.00e+01, 7.50e+01,
        8.00e+01, 8.50e+01, 9.00e+01, 9.50e+01, 1.00e+02, 1.05e+02, 1.10e+02, 1.15e+02,
        1.20e+02, 1.25e+02, 1.30e+02, 1.35e+02, 1.40e+02, 1.45e+02, 1.50e+02, 1.55e+02,
        1.60e+02, 1.65e+02, 1.70e+02, 1.75e+02, 1.80e+02, 1.85e+02, 1.90e+02, 1.95e+02,
        2.00e+02, 2.05e+02, 2.10e+02, 2.15e+02, 2.20e+02, 2.25e+02, 2.30e+02, 2.35e+02,
        2.40e+02, 2.45e+02, 2.50e+02, 2.60e+02, 2.70e+02, 2.80e+02, 2.90e+02, 3.00e+02,
        3.10e+02, 3.20e+02, 3.30e+02, 3.40e+02, 3.50e+02, 3.75e+02, 4.00e+02, 4.25e+02,
        4.50e+02, 4.75e+02, 5.00e+02, 5.50e+02, 6.00e+02, 6.50e+02, 7.00e+02, 7.50e+02,
        8.00e+02, 8.50e+02, 9.00e+02, 9.50e+02, 1.00e+03, 1.10e+03, 1.20e+03, 1.30e+03,
        1.40e+03, 1.50e+03, 1.60e+03, 1.70e+03, 1.80e+03, 1.90e+03, 2.00e+03, 2.20e+03,
        2.40e+03, 2.60e+03, 2.80e+03, 3.00e+03, 3.20e+03, 3.40e+03, 3.60e+03, 3.80e+03,
        4.00e+03, 4.20e+03, 4.40e+03, 4.60e+03, 4.80e+03, 5.00e+03, 5.20e+03, 5.40e+03,
        5.60e+03, 5.80e+03, 6.00e+03
    ])
    N_plevels = pressure_.size
    print("N_plevels:", N_plevels)
    
    temperature_ = da.empty(shape=(N_profiles, N_plevels), chunks=(N_profiles,1), dtype=np.float32)
    salinity_ = da.empty(shape=(N_profiles, N_plevels), chunks=(N_profiles,1), dtype=np.float32)
    pressure_error_ = da.empty(shape=(N_profiles, N_plevels), chunks=(N_profiles,1), dtype=np.float32)
    temperature_error_ = da.empty(shape=(N_profiles, N_plevels), chunks=(N_profiles,1), dtype=np.float32)
    salinity_error_ = da.empty(shape=(N_profiles, N_plevels), chunks=(N_profiles,1), dtype=np.float32)
    
    ds = xr.Dataset(
        data_vars=dict(
            temperature =(["profilelocation_index", "pressure"], temperature_),
            salinity =(["profilelocation_index", "pressure"], salinity_),
            pressure_error =(["profilelocation_index", "pressure"], pressure_error_),
            temperature_error =(["profilelocation_index", "pressure"], temperature_error_),
            salinity_error = (["profilelocation_index", "pressure"], salinity_error_),
            paths =  (["profilelocation_index"], paths_),
            latitude = ( ["profilelocation_index"], lat_),
            longitude = ( ["profilelocation_index"], lon_ ),
            time = ( ["profilelocation_index"], time_),
        ),
        coords=dict(
            profilelocation_index = ( ["profilelocation_index"], np.arange(N_profiles) ),
            pressure = (["pressure"], pressure_ ),
        ),
        attrs=dict(
            description = "EasyOneArgoTSLite 2025-10 version's variables, stored as (profile, pressure)",
            filename = outfile,
            initialized_date = str( datetime.now() )
        )
    )
    ds.time.attrs["standard_name"] = 'time'
    ds.time.attrs["units"] = 'days since 1970-01-01 00:00:00'
    ds.pressure.attrs["standard_name"] = 'pressure'
    ds.pressure.attrs["units"] = 'decibars'
    ds.latitude.attrs["standard_name"] = 'latitude'
    ds.latitude.attrs["units"] = 'degrees_north'
    ds.longitude.attrs["standard_name"] = 'longitude'
    ds.longitude.attrs["units"] = 'degrees_east'
    ds.temperature.attrs["standard_name"] = 'temperature'
    ds.temperature.attrs["long_name"] = 'sea temperature in-situ ITS-90 scale'
    ds.temperature.attrs["units"] = 'degrees_celsius'
    ds.salinity.attrs["standard_name"] = 'salinity'
    ds.salinity.attrs["long_name"] = 'practical salinity'
    ds.salinity.attrs["units"] = 'dimensionless'
    ds.pressure_error.attrs["standard_name"] = 'pressure_error'
    ds.pressure_error.attrs["units"] = 'decibars'
    ds.temperature_error.attrs["standard_name"] = 'temperature_error'
    ds.temperature_error.attrs["units"] = 'degrees_celsius'
    ds.salinity_error.attrs["standard_name"] = 'salinity_error'
    ds.salinity_error.attrs["units"] = 'dimensionless'
    ds.paths.attrs["standard_name"] = 'EasyOneArgoTSLite_filename'
    
    
    target_mapper = util.get_gcs().get_mapper(outfile)
    
    target_writer = ds.chunk(
        {'profilelocation_index': 125_000, 'pressure': N_plevels}
    ).to_zarr(
        target_mapper, 
        mode="w",
        consolidated=True, 
        compute=False
    )

    print(' Empty grid initialized:  '+outfile, end='') 
        
    return target_writer

In [2]:
DATA_ROOT = Path("/home/jovyan/EasyOneArgoTSLite/EasyOneArgoTSLite_20251015T172450Z/data")

In [3]:
metadata_extraction(DATA_ROOT)

0:06:53.257887
0:13:47.107261
0:20:35.830441
0:27:38.880952
0:43:34.765676
0:50:09.559780
0:56:56.736362
1:11:38.408050
1:18:58.146944
1:26:07.063488


  times = np.array(ts, dtype="datetime64[ns]")


Saved to /home/jovyan/EasyOneArgoTSLite/derived


In [20]:
OUT_DIR = Path("/home/jovyan/EasyOneArgoTSLite/derived")
latitudes = np.load(OUT_DIR / "latitudes.npy")
longitudes = np.load(OUT_DIR / "longitudes.npy")
times = np.load(OUT_DIR / "times.npy")
paths = np.load(OUT_DIR / "paths.npy")

In [21]:
lon_180180 = longitudes.copy()
ind = np.where(longitudes>180.0)
ind_neg180 = np.where(longitudes==-180.0)
lon_180180[ind] = lon_180180[ind] - 360.0
lon_180180[ind_neg180] = lon_180180[ind_neg180] + 360.0

In [22]:
time_days = (times.astype('datetime64[s]')-np.datetime64('1970-01-01T00:00:00')).astype(float) / (60*60*24)

In [31]:
outfile = 'pangeo-argo-eke/data/EasyOneArgoTSLite_v01'

In [33]:
gcs = util.get_gcs(path=None, asynchronous=False)

In [34]:
%%time
target_writer = initialize_EasyOneArgoTSLitedataset_RavioliChunks(outfile, latitudes, lon_180180, time_days, paths)

N_profiles: 2651963
N_plevels: 107




 Empty grid initialized:  pangeo-argo-eke/data/EasyOneArgoTSLite_v01CPU times: user 630 ms, sys: 811 ms, total: 1.44 s
Wall time: 28 s


In [35]:
import dask

In [36]:
%%time
_ = dask.compute(target_writer)

CPU times: user 6.89 s, sys: 3.45 s, total: 10.3 s
Wall time: 16.1 s
