# waves_by_cowclip

Notebook environment to migrate csv files to CF compliant zarr

In [1]:
# Optional; code formatter, installed as jupyter lab extension
#%load_ext lab_black
# Optional; code formatter, installed as jupyter notebook extension
%load_ext nb_black

<IPython.core.display.Javascript object>

### Configure OS independent paths

In [2]:
# Import standard packages
import os
import pathlib

import sys
import numpy as np
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
import xarray as xr
import math
import itertools
import glob
from copy import deepcopy

# Import custom functionality
from coclicodata.drive_config import p_drive
from coclicodata.etl.cf_compliancy_checker import check_compliancy, save_compliancy

# Define (local and) remote drives
dir_path_data = p_drive.joinpath('11209199-climate-resilient-ports','00_general','02_COWCLIP_Harvest_Morim_et_al','output_NetCDF')

# Workaround to the Windows OS (10) udunits error after installation of cfchecker: https://github.com/SciTools/iris/issues/404
os.environ["UDUNITS2_XML_PATH"] = str(
    pathlib.Path().home().joinpath(  # change to the udunits2.xml file dir in your Python installation
        r"AppData\Local\mambaforge\pkgs\udunits2-2.2.28-h892ecd3_0\Library\share\udunits\udunits2.xml"
    )
)

# Project paths & files (manual input)
dir_path_cf = dir_path_data.joinpath(r"CF")  # directory to save output CF check files
file_path_cf_merged_nc = dir_path_data.joinpath('waves_by_cowclip_cf_merged.nc')  # file to save merged netCDF file

<IPython.core.display.Javascript object>

In [3]:
dir_path_data

WindowsPath('P:/11209199-climate-resilient-ports/00_general/02_COWCLIP_Harvest_Morim_et_al/output_NetCDF')

<IPython.core.display.Javascript object>

### Get NC-files

In [4]:
# Get all nc files in directory and subdirectories if does not contain 'cf' in filename
file_path_ls = glob.glob(str(dir_path_data.joinpath("**/*.nc")), recursive=True)
file_path_ls = [f for f in file_path_ls if "cf" not in f]

<IPython.core.display.Javascript object>

In [5]:
file_path_ls

['P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\median_Dm_avg_under_rcp45.nc',
 'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\median_Dm_avg_under_rcp85.nc',
 'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\percentiles\\p000_Dm_avg_under_rcp45.nc',
 'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\percentiles\\p000_Dm_avg_under_rcp85.nc',
 'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\percentiles\\p005_Dm_avg_under_rcp45.nc',
 'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\percentiles\\p005_Dm_avg_under_rcp85.nc',
 'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\percentiles\\p095_Dm_

<IPython.core.display.Javascript object>

### Check CF compliancy original NetCDF files

In [6]:
# open first dataset
ds = xr.open_dataset(file_path_ls[0])

# check original dataset
ds

<IPython.core.display.Javascript object>

In [7]:
%%capture cap --no-stderr
# check original CF compliancy (for first file)

check_compliancy(testfile=pathlib.Path(file_path_ls[0]), working_dir=dir_path_cf)

<IPython.core.display.Javascript object>

In [8]:
# save original CF compliancy (for first file)
save_compliancy(cap, testfile=pathlib.Path(file_path_ls[0]), working_dir=dir_path_cf)



<IPython.core.display.Javascript object>

### Make CF compliant alterations to the NetCDF files (dataset dependent)

In [9]:
# open datasets
ds_ls = []
for file_path in file_path_ls:
    ds_ls.append(xr.open_dataset(file_path))

<IPython.core.display.Javascript object>

In [10]:
import json
# NetCDF attribute alterations by means of metadata template
f_global = open(dir_path_data.joinpath("metadata_waves_by_cowclip.json"))
meta_global = json.load(f_global)

# Add metadata to datasets
for ds in ds_ls:
    for attr_name, attr_val in meta_global.items():
        if attr_name == 'PROVIDERS':
            attr_val = json.dumps(attr_val)
        ds.attrs[attr_name] = attr_val

    ds.attrs['Conventions'] = "CF-1.8"

<IPython.core.display.Javascript object>

In [11]:
def transpose_dims_of_vars(ds, vars=[], order=[]):
    """ Transpose dimensions of variables in dataset. """
    # Get all variables if none are specified
    if len(vars) == 0:
        vars = list(ds.data_vars)
    
    # Get reversed order of dimensions if none is specified
    if len(order) == 0:
        order = tuple(reversed(tuple(range(len(ds[vars[0]].dims)))))

    # Get dimensions
    dims = list(ds[vars[0]].dims)

    # Transpose dimensions
    dims = [dims[i] for i in order]

    for var in vars:
        # Get values
        vals = ds[var].values

        # Transpose values
        vals = vals.transpose(*order)

        # Add variable to dataset
        if var in dims:
            ds = ds.assign_coords({var: (dims, vals)})
        else:
            ds = ds.assign({var: (dims, vals)})

        
    return ds

def assign_coords_to_dim(ds, dim, coords):
    """ Assign coordinates to dimension. """
    # Assign coordinates to dimension
    ds = ds.assign_coords({dim: coords})
    
    # Convert dimension to string if it is an object
    if type(coords[0]) == str:
        ds[dim] = ds[dim].astype('S')
    
    # Reset index
    #ds = ds.reset_index(dim)

    return ds

def add_dim(ds, dim, coords, axis=-1):
    """ Add dimension to dataset. """
    # Add dimension to dataset
    if axis == None:
        ds = ds.expand_dims(dim={dim: coords})
    ds = ds.expand_dims(dim={dim: coords}, axis=axis)

    # Assign coordinates to dimension
    ds = assign_coords_to_dim(ds, dim, coords)
    
    return ds

def merge_vars(ds, dim, var, vars=[], coords=[], axis=-1):
    """ Merge variables into one variable. """
    # Get all variables if none are specified
    if len(vars) == 0:
        vars = list(ds.data_vars)

    # Coordinates are the same as the variables if none are specified
    if len(coords) == 0:
        coords = vars

    # Get dimensions and values
    dims = list(ds[vars[0]].dims) + [dim]
    vals = ds[vars[0]].values

    # Merge values
    vals = np.stack([ds[var].values for var in vars], axis=-1)

    # Remove variables
    ds = ds.drop_vars(vars)

    # Add dimension to dataset
    ds = add_dim(ds, dim, coords)

    # Add variable to dataset
    ds[var] = xr.DataArray(vals, dims=dims)

    # Reorder dimensions
    if axis != -1:
        dims_r = dims.copy()
        dims_r.pop(-1)
        dims_r.insert(axis, dim)
        ds = ds.transpose(*dims_r)
    
    return ds

# Assign atrributes to variables and coordinates
def assign_attributes(ds, var, attrs):
    """ Assign attributes to variable or coordinate. """
    # Assign attributes to variable or coordinate
    ds[var].attrs = attrs
    
    return ds

<IPython.core.display.Javascript object>

In [12]:
# Check file path
file_path_ls[0]

'P:\\11209199-climate-resilient-ports\\00_general\\02_COWCLIP_Harvest_Morim_et_al\\output_NetCDF\\Dm_avg\\median_Dm_avg_under_rcp45.nc'

<IPython.core.display.Javascript object>

In [13]:
# NetCDF variable and dimension alterations (per dataset)
ds_cf_ls = []
for ds, file_path in zip(ds_ls, file_path_ls):
    # Get information from file_path
    file_name = file_path.split('\\')[-1].split('.')[0]
    file_name_split = file_name.split('_')
    ens_perc = file_name_split[0]
    ens_perc = 50 if ens_perc == 'median' else int(ens_perc[1:])
    var = file_name_split[1]
    var_stat = file_name_split[2]
    rcp = float(file_name_split[4][3:5])/10

    # Get information from paper
    periods_str = [['1979-01-01', '2004-01-01'], ['2081-01-01', '2100-01-01']]
    periods = []
    for period_str in periods_str:
        periods.append([pd.to_datetime(period_str[0]), pd.to_datetime(period_str[1])])
    times = [period[0] + (period[1] - period[0])/2 for period in periods] # Mean of periods

    # Rename dimensions and variables
    ds = ds.rename_dims({"latitude": "lat","longitude": "lon"})
    ds = ds.rename_vars({"LAT": "lat", "LON": "lon"})

    # Transpose dimensions of variables (lat, lon)
    ds = transpose_dims_of_vars(ds)

    # Set coordinates
    ds = ds.set_coords(['lat','lon'])

    # Correct longitude values from 0-360 to -180-180
    ds['lon'] = (ds['lon'] + 180) % 360 - 180

    # Sort data by ascending longitude
    for var_ in ds.data_vars:
        values  = ds[var_].values
        lons = ds['lon'].values
        idx = np.argsort(lons[0])
        values = values[:,idx]
        lons = lons[:,idx]
        ds[var_].values = values
    ds['lon'].values = lons
    
    # Merge variables
    ds = merge_vars(ds, dim='time', var=var, vars=['hist_value','pred_value'], coords=times, axis=0)
    
    # Add dimensions
    ds = add_dim(ds, dim='ens_perc', coords=[ens_perc], axis=0)
    ds = add_dim(ds, dim='var_stat', coords=[var_stat], axis=0)
    ds = add_dim(ds, dim='rcp',coords=[rcp], axis=0)
    
    # Drop variables
    if 'diff_value' in ds.data_vars:
        ds = ds.drop_vars(['diff_value'])
    if 'diff_perc_value' in ds.data_vars:
        ds = ds.drop_vars(['diff_perc_value'])
    
    # Add attributes for coordinates
    ds = assign_attributes(ds, var='lat', attrs={'standard_name': 'latitude', 'long_name': 'latitude', 'units': 'degrees_north'})
    ds = assign_attributes(ds, var='lon', attrs={'standard_name': 'longitude', 'long_name': 'longitude', 'units': 'degrees_east'})
    ds = assign_attributes(ds, var='time', attrs={'standard_name': 'time', 'long_name': 'time', 'units': 'years'})
    ds = assign_attributes(ds, var='ens_perc', attrs={'long_name': 'model ensemble percentile', 'units': '1'})
    ds = assign_attributes(ds, var='var_stat', attrs={'long_name': 'variable statistic', 'units': '1'})
    ds = assign_attributes(ds, var='rcp', attrs={'long_name': 'radiative forcing pathway', 'units': '1'})

    # Add attributes for variables
    variable_attr_dict = {'Hs': {'long_name': 'significant wave height', 'units': 'm'},
                          'Tm': {'long_name': 'wave period', 'units': 's'},
                          'Dm': {'long_name': 'wave direction', 'units': 'degrees'}}
    
    ds = assign_attributes(ds, var=var, attrs=variable_attr_dict[var])
    
    # Remove times unit in order to save as netCDF
    ds['time'].attrs.pop('units')

    # Add to list of datasets
    ds_cf_ls.append(ds)

print('Number of datasets: {}'.format(len(ds_cf_ls)))

Number of datasets: 160


<IPython.core.display.Javascript object>

In [14]:
# Check dataset
ds_cf_ls[0]

<IPython.core.display.Javascript object>

In [15]:
# Function to split datasets
def split_dataset(ds_ls):
    # Initiate list of datasets
    ds_ls_split = []
    
    # Split datasets
    for ds in ds_ls:
        for var in ds.data_vars:
            # Copy dataset
            ds_copy = deepcopy(ds)

            # Drop variables except for variable of interest
            ds_copy = ds_copy.drop_vars([var_ for var_ in ds_copy.data_vars if var_ != var])

            # Add to list of datasets
            ds_ls_split.append(ds_copy)
    
    # Return list of datasets
    return ds_ls_split

<IPython.core.display.Javascript object>

In [16]:
ds_cf_split_ls = split_dataset(ds_cf_ls)

print('Number of datasets: {}'.format(len(ds_cf_split_ls)))

Number of datasets: 160


<IPython.core.display.Javascript object>

In [17]:
# Check dataset
ds_cf_split_ls[0]

<IPython.core.display.Javascript object>

In [18]:
# Function to get merge dataframe 
def get_merge_dataframe(ds_ls, merge_keys):
    # Create merge list
    merge_ls = []
    for i, ds in enumerate(ds_ls):
        merge_dict = {}
        merge_dict = {'index': i}
        for key in merge_keys:
            if key in ds.coords: # if key is a coordinate
                merge_dict[key] = ds[key].values[0]
            elif key == 'variables': # if key is variables
                merge_dict[key] = list(ds.keys())[0]
            else: # if key is not a coordinate
                merge_dict[key] = ''
        merge_ls.append(merge_dict)
    
    # Create dataframe
    merge_df = pd.DataFrame(merge_ls)

    # Set types of columns
    for key in merge_keys:
        if key in ds.coords: # if key is a coordinate
            merge_df[key] = merge_df[key].astype(ds_ls[0][key].dtype)
        elif key == 'variables': # if key is variables
            merge_df[key] = merge_df[key].astype(str)
        else: # if key is not a coordinate
            merge_df[key] = merge_df[key].astype(str)

    # Sort dataframe
    merge_keys_r = merge_keys.copy()
    merge_keys_r.reverse()
    merge_df = merge_df.sort_values(merge_keys_r)
    
    # Set all types to string
    for key in merge_keys:
        merge_df[key] = merge_df[key].astype(str)

    # Return dataframe
    return merge_df

# Function to get merge dictionary
def get_merge_dictionary(merge_df, merge_keys):
    
    # Consecutive groupby on dataframe
    merge_df_grouped = merge_df
    for i, key in enumerate(merge_keys):
        if i == len(merge_keys)-1:
            break
        # Group by merge keys
        merge_df_grouped = merge_df_grouped.groupby(merge_keys[i+1:]).agg({'index': lambda x: {key: x.tolist()}}).reset_index()
    
    # Create merge dictionary
    merge_dict = {merge_keys[-1]: merge_df_grouped['index'].tolist()}

    # Return merge dictionary
    return merge_dict

# Function to merge datasets
def merge_datasets(ds_ls, merge_dict, max_level=np.inf):
    # Note merge_dict is a dict with a list of dicts len(merge_keys) deep in which the keys are the merge_keys
    # and the values at the deepest level are lists of indices of datasets to be merged

    # Fill merge dict with datasets (recursively)
    def fill_merge_dict_with_datasets(ds_ls, merge_dict):
        for key, val in merge_dict.items():
            if isinstance(val, dict):
                fill_merge_dict_with_datasets(ds_ls, merge_dict[key])
            elif isinstance(val, list):
                for i, val_ in enumerate(val):
                    if isinstance(val_, dict):
                        fill_merge_dict_with_datasets(ds_ls, val_)
                    else:
                        # Replace index with dataset
                        merge_dict[key][i] = ds_ls[val_]
        
        # Return merge dict
        return merge_dict

    # Merge datasets (recursively)
    def merge_datasets(ds_merge_dict):

        # Recursively merge datasets
        def merge_datasets_(ds_merge_dict):
            for key, val in ds_merge_dict.items():
                if isinstance(val, dict):
                    merge_datasets_(ds_merge_dict[key])
                elif isinstance(val, list):
                    if all(isinstance(val_, dict) for val_ in val):
                        for i, val_ in enumerate(val):
                            ds_merge_dict[key][i] = merge_datasets_(val_)
                    
                    # Datasets to be merged
                    elif all(isinstance(val_, xr.Dataset) for val_ in val):
                        if all(key in ds.coords for ds in val):
                            # Concat if key is a coordinate
                            ds_merge_dict = xr.concat(val, dim=key)

                            # Get dimensions
                            #dims = list(ds_merge_dict.dims)

                            # Transpose dimensions newest last
                            #dims.append(dims.pop(dims.index(key)))
                            #ds_merge_dict = ds_merge_dict.transpose(*dims)
                        else:
                            # Get dimensions
                            #dims = list(val[0].dims)

                            # Merge if key is a variable
                            ds_merge_dict = xr.merge(val)

                            # Transpose dimensions
                            #ds_merge_dict = ds_merge_dict.transpose(*dims)
            
            # Return merged datasets
            return ds_merge_dict
        
        # Merge datasets until no more dicts in merge_dict
        level = 0
        while isinstance(ds_merge_dict,dict) and level < max_level:
            ds_merge_dict = merge_datasets_(ds_merge_dict)
            level += 1
        
        # Return merged datasets
        return ds_merge_dict
    
    # Fill merge dict with datasets
    ds_merge_dict = fill_merge_dict_with_datasets(ds_ls, deepcopy(merge_dict))

    # Merge datasets
    ds_merged = merge_datasets(deepcopy(ds_merge_dict))
    
    return ds_merged

# Function to count datasets
def count_datasets(ds_cf_merged):
    # Count number of datasets
    def count_datasets_(ds_cf_merged, count):
        if isinstance(ds_cf_merged, xr.Dataset):
            count += 1
        elif isinstance(ds_cf_merged, dict):
            for key, val in ds_cf_merged.items():
                count = count_datasets_(val, count)
        elif isinstance(ds_cf_merged, list):
            for val in ds_cf_merged:
                count = count_datasets_(val, count)
        return count
    
    count = 0 
    return count_datasets_(ds_cf_merged, count)

<IPython.core.display.Javascript object>

In [28]:
merge_keys = ['ens_perc','var_stat','rcp','variables']
merge_df = get_merge_dataframe(ds_cf_split_ls, merge_keys)
merge_dict = get_merge_dictionary(merge_df, merge_keys)
ds_cf_merged = merge_datasets(ds_cf_split_ls, merge_dict, max_level=len(merge_keys))

print('Number of datasets: {}'.format(count_datasets(ds_cf_merged)))

Number of datasets: 1


<IPython.core.display.Javascript object>

In [27]:
# Check dataset
ds_cf_merged

<IPython.core.display.Javascript object>

In [21]:
# Function to remove indices
def remove_indices(ds_cf_merged):
    # Get dims
    dims = ds_cf_merged.dims

    # Remove unnecessary indices
    for dim in dims:
        try:
            ds_cf_merged = ds_cf_merged.reset_index(dim)
        except:
            continue
    
    # Return dataset
    return ds_cf_merged

ds_cf_merged = remove_indices(ds_cf_merged)
ds_cf_merged

<IPython.core.display.Javascript object>

In [22]:
# write to NetCDF file to check compliancy

# prevent file locking, see: https://github.com/pydata/xarray/issues/2376
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'

ds_cf_merged['time'].attrs.pop('units', None)
ds_cf_merged.to_netcdf(path=file_path_cf_merged_nc)

<IPython.core.display.Javascript object>

### Check CF compliancy altered NetCDF files

In [23]:
%%capture cap --no-stderr
# check altered CF compliancy

check_compliancy(testfile=pathlib.Path(file_path_cf_merged_nc), working_dir=dir_path_cf)

<IPython.core.display.Javascript object>

In [24]:
# save altered CF compliancy
save_compliancy(cap, testfile=file_path_cf_merged_nc, working_dir=dir_path_cf)



<IPython.core.display.Javascript object>

### Write data to Zarr files

In [25]:
# export to Zarr in one-liner (as rp is the temporal dimension)

# export to zarr in write mode (to overwrite if exists)
ds_cf_merged.to_zarr(str(file_path_cf_merged_nc).replace(".nc", ".zarr"), mode="w")

<xarray.backends.zarr.ZarrStore at 0x2748facbbc0>

<IPython.core.display.Javascript object>