# 01 - Storm Surge Dataset JRC

This script performs the following tasks:
1. [Auth] writes data to Zarr files (cloud-native file format) (AUTh)
3. [Deltares] checks and creates a geoJSON from Zarr data (required for the Front-End)
2. [Deltares] uploads the Zarr to a Google Cloud Storage (GCS) bucket 
4. [Deltares] uploads the geoJSON to Mapbox 
5. [Deltares] update the STAC

TODO: 
1. make consistent with cf conventions (AUTh?)
2. come up with checks (cf conventions) for Zarr file before continueing to create a geoJSON 
3. [Maarten] bold names in Zarr due to a dimension index, now this is changed
4. [Maarten] make imports in generate.py importable
5. [Maarten] cannot use dimenions to read variables (see third point..) search a way to connect dimensions and variables, connects to 3rd point. Only works if variable name == dimension name 
6. [Maarten] Zarr does not add offset automatically..
7. Possibly write to GCS bucket first.. implement checks there (as the gcs link is in the STAC)

In [1]:
# Optional; code formatter, installed as jupyter lab extension
#%load_ext lab_black
# Optional; code formatter, installed as jupyter notebook extension
%load_ext nb_black

# imports
import sys
import os
import geojson
import json
import netCDF4 as nc
import pathlib
import platform
import xarray as xr
import pandas as pd
import zarr
import subprocess
import warnings
import numpy as np

from google.cloud import storage
from dotenv import dotenv_values
from itertools import product
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict
from pystac.extensions.datacube import DatacubeExtension, Dimension, Variable
from pystac import Asset, CatalogType, Collection, Item, Summaries
from pystac.layout import BestPracticesLayoutStrategy
from pystac.stac_io import DefaultStacIO
from pystac.utils import JoinType, join_path_or_url, safe_urlparse
from pystac import RelType

warnings.filterwarnings("ignore")

# make root directories importable by appending root to path
cwd = pathlib.Path().resolve()
sys.path.append(os.path.dirname(cwd))

# OS independent path configurations
if platform.system() == "Windows":
    root = pathlib.Path("P:/")
else:  # linux or other
    root = pathlib.Path("/p/")

coclico_data_dir = pathlib.Path(root, "11205479-coclico", "data")



<IPython.core.display.Javascript object>

In [2]:
# paths to the dataset, manual input
dataset_dir = coclico_data_dir.joinpath("01_storm_surge_jrc")
dataset_historical_path = dataset_dir.joinpath("CoastAlRisk_Europe_EESSL_Historical.nc")
dataset_rcp45_path = dataset_dir.joinpath("CoastAlRisk_Europe_EESSL_RCP45.nc")
dataset_rcp85_path = dataset_dir.joinpath("CoastAlRisk_Europe_EESSL_RCP85.nc")
dataset_out_file = "CoastAlRisk_Europe_EESSL"

# GCS and mapbox private access keys
GCS_token = coclico_data_dir.joinpath(
    "google_credentials.json"
)  # path name (including json file name)
config = dotenv_values(".env")
mapbox_token = config["MAPBOX_TOKEN"]  # mapbox private key

# STAC custom functions
local_STAC = r"../../coclicodata"  # path to local GitHub STAC clone
sys.path.insert(-1, local_STAC)
import generate

<IPython.core.display.Javascript object>

# 1. write data to Zarr files

In [3]:
# open datasets
dataset_historical = xr.open_dataset(dataset_historical_path)
dataset_45rcp = xr.open_dataset(dataset_rcp45_path)
dataset_85rcp = xr.open_dataset(dataset_rcp85_path)

# check original dataset
# dataset_historical

<IPython.core.display.Javascript object>

In [4]:
# rename dimension names
dataset_historical = dataset_historical.rename_dims({"row": "stations", "col": "rp"})
dataset_45rcp = dataset_45rcp.rename_dims({"row": "stations", "col": "rp"})
dataset_85rcp = dataset_85rcp.rename_dims({"row": "stations", "col": "rp"})

# rename variables, if necessary
# dataset_historical = dataset_historical.rename_vars({"RP": "rp"})
# dataset_45rcp = dataset_45rcp.rename_vars({"RP": "rp"})
# dataset_85rcp = dataset_85rcp.rename_vars({"RP": "rp"})

# set some data variables to coordinates to avoid duplication of dimensions in later stage
dataset_historical = dataset_historical.set_coords(["longitude", "latitude", "RP"])
dataset_45rcp = dataset_45rcp.set_coords(["longitude", "latitude", "RP"])
dataset_85rcp = dataset_85rcp.set_coords(["longitude", "latitude", "RP"])

<IPython.core.display.Javascript object>

In [5]:
# concat datasets along new dimension with index values and name derived from pandas index object, if necessary
dataset = xr.concat(
    [dataset_historical, dataset_45rcp, dataset_85rcp],
    pd.Index(["historical", "rcp45", "rcp85"], name="scenario"),
)

# rename dimension names for the concatenated variable
dataset = dataset.rename_dims({"scenario": "scenarios"})

<IPython.core.display.Javascript object>

In [6]:
# re-order shape of the data variables
dataset = dataset.transpose("scenarios", "stations", "rp")

<IPython.core.display.Javascript object>

In [7]:
# check the xarray dataset
dataset

<IPython.core.display.Javascript object>

In [8]:
# export to zarr in write mode (to overwrite if exists)
dataset.to_zarr(dataset_dir.joinpath("%s.zarr" % dataset_out_file), mode="w")

<xarray.backends.zarr.ZarrStore at 0x1f69aa369e0>

<IPython.core.display.Javascript object>

# 2. check and create geoJSON from Zarr data

In [9]:
# load (locally or cloud) stored Zarr
# zarr_fn = str(dataset_dir.joinpath("%s.zarr" % dataset_out_file))  # local file path
zarr_fn = (
    r"gcs://dgds-data-public/coclico/CoastAlRisk_Europe_EESSL.zarr"  # cloud file path
)
nddata = xr.open_zarr(zarr_fn)

# specify input variables
mapbox_url = "https://global-data-viewer"
template = "deltares-coclico-ssl"
variable = "ssl"  # note, variable is set as different variables might not have the same dimensions
datasetid = f"{variable}-mapbox"  # f"deltares-coclico-{variable}"

<IPython.core.display.Javascript object>

In [10]:
# check geojson

# TODO Come up with checks

<IPython.core.display.Javascript object>

In [11]:
# write geoJSON

# automated variable retrieval (without hidden files)
variables = list(nddata.variables)

# autmated dimension retrieval
dimensions = list(nddata["%s" % (variable)].dims)

# write data to flattened GeoJSON file - Mapbox styling uses this (aligned with STAC)
cube_dimensions = {}
for dimension in dimensions:  # loop over dimensions of variable

    for var in variables:  # identify variable with dimension
        var_dim = nddata["%s" % var].dims
        if (
            len(var_dim) == 1 and var_dim[0] == dimension and dimension != "stations"
        ):  # only take the variables in the coordinates with a single dimension, where dimension is equal to the
            # variable dimensions and where it is not equal to stations (assumed present and independent)

            dim = nddata[var]

            # Only applicable in proper CF convention zarr
            #             if dimension == "stations": # TODO Remove because this is abundant due to if statement
            #                 dimdict = {
            #                     "type": "stations",
            #                     "extent": [min(dim), max(dim)],
            #                     "unit": "-",
            #                 }
            #             else:
            dimdict = {
                "type": "temporal",  # TODO To be customized based on variable?
                "values": dim[:].values.tolist(),
                "unit": dim.attrs.get("units", "-"),
            }
            dim = Dimension.from_dict(dimdict)
            cube_dimensions[var] = dim

dimvals = {k: v.values for k, v in cube_dimensions.items() if v.values}

# dot product of variables keys
keys = []
for values in product(*dimvals.values()):
    # TODO Improve key gen and align with geojson generation
    keys.append(
        "-".join(map(lambda x: "-".join(x), zip(dimvals.keys(), map(str, values))))
    )

# flatten single data values over specific dimension keys
features = []
for j, (lon, lat) in enumerate(
    zip(nddata["longitude"][:].values, nddata["latitude"][:].values)
):  # assumes longitude and latitude are present and independent
    point = geojson.Point((float(lon), float(lat)))
    feature = geojson.Feature(geometry=point)
    feature["properties"]["locationId"] = j

    for a, b in zip(
        nddata["%s" % variable][:, j, :].values.flatten(), keys
    ):  # flattened along dimensions
        feature["properties"][b] = a

    features.append(feature)

# store the features in a GeoJSON file
collection = geojson.FeatureCollection(features)
with open(
    os.path.join(dataset_dir, "platform", r"%s.geojson" % dataset_out_file), "w"
) as f:
    geojson.dump(collection, f)

<IPython.core.display.Javascript object>

In [12]:
# check written geojson

# check shape
with open(os.path.join(dataset_dir, "platform", "%s.geojson" % dataset_out_file)) as f:
    check = geojson.load(f)

print(check["features"][2])

# check the minima and maxima for the colormap boundaries
# scenario = 0
# for idx, i in enumerate(dataset["RP"][:].values):
#     print(
#         i,
#         round(min(nddata["ssl"][scenario, :, idx].values), 2),
#         round(max(nddata["ssl"][scenario, :, idx].values), 2),
#     )

{"geometry": {"coordinates": [-0.1, 49.7], "type": "Point"}, "properties": {"locationId": 2, "scenario-historical-RP-10.0": 2.08892, "scenario-historical-RP-100.0": 2.43342, "scenario-historical-RP-1000.0": 2.66859, "scenario-historical-RP-20.0": 2.20901, "scenario-historical-RP-200.0": 2.51257, "scenario-historical-RP-5.0": 1.95, "scenario-historical-RP-50.0": 2.3447, "scenario-historical-RP-500.0": 2.60544, "scenario-rcp45-RP-10.0": 2.12185, "scenario-rcp45-RP-100.0": 2.49559, "scenario-rcp45-RP-1000.0": 2.75567, "scenario-rcp45-RP-20.0": 2.25063, "scenario-rcp45-RP-200.0": 2.58303, "scenario-rcp45-RP-5.0": 1.97503, "scenario-rcp45-RP-50.0": 2.39813, "scenario-rcp45-RP-500.0": 2.68587, "scenario-rcp85-RP-10.0": 2.12493, "scenario-rcp85-RP-100.0": 2.44322, "scenario-rcp85-RP-1000.0": 2.62478, "scenario-rcp85-RP-20.0": 2.24118, "scenario-rcp85-RP-200.0": 2.50843, "scenario-rcp85-RP-5.0": 1.985, "scenario-rcp85-RP-50.0": 2.36607, "scenario-rcp85-RP-500.0": 2.57984}, "type": "Feature"}


<IPython.core.display.Javascript object>

# 3. upload Zarr to GCS bucket

In [13]:
# upload zarr folder to GCS
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = str(GCS_token)

# function to upload zarr folder to GCS
storage_client = storage.Client()


def upload_from_directory(directory_path, dest_bucket_name, dest_blob_name):
    rel_paths = directory_path.glob("**/*")
    bucket = storage_client.bucket(dest_bucket_name)
    for local_file in rel_paths:
        remote_path = f'{dest_blob_name}/{"/".join(str(local_file).split(os.sep)[5:])}'  # note 5: is hardcoded and might lead to problems
        if os.path.isfile(local_file):
            blob = bucket.blob(remote_path)
            blob.upload_from_filename(local_file)

    # print status
    print("Folder uploaded to GCS")


# specification of directory, bucket and file name to feed into the function
directory_path = dataset_dir.joinpath("%s.zarr" % dataset_out_file)
dest_bucket_name = "dgds-data-public"
dest_blob_name = "coclico/" + dataset_out_file + ".zarr"
folder_upload = upload_from_directory(directory_path, dest_bucket_name, dest_blob_name)

Folder uploaded to GCS


<IPython.core.display.Javascript object>

# 4. upload geoJSON to Mapbox

In [14]:
# ingest geoJSON into mapbox tilesets

# python way of running CLI to upload to mapbox automatically
subprocess.run(
    [
        "mapbox",
        "--access-token",
        mapbox_token,
        "upload",
        r"global-data-viewer.%s" % dataset_out_file.split(".")[0],
        os.path.join(
            dataset_dir, "platform", r"%s.geojson" % dataset_out_file.split(".")[0]
        ),
    ],
    shell=True,
    check=True,
)

# notebook version of CLI
#!mapbox --access-token {mapbox_token} upload {filename} {source}

# CLI command (example)
# mapbox --access-token **write out mapbox_token** upload global-data-viewer.CoastAlRisk_Europe_EESSL p:\11205479-coclico\data\01_storm_surge_jrc\platform\CoastAlRisk_Europe_EESSL.geojson

CompletedProcess(args=['mapbox', '--access-token', 'sk.eyJ1IjoiZ2xvYmFsLWRhdGEtdmlld2VyIiwiYSI6ImNsMnB0dmlscTFrZnEzY211cWxna3Bxb3AifQ.OQ3P9PEjZUiErvjrwVbsag', 'upload', 'global-data-viewer.CoastAlRisk_Europe_EESSL', 'P:\\11205479-coclico\\data\\01_storm_surge_jrc\\platform\\CoastAlRisk_Europe_EESSL.geojson'], returncode=0)

<IPython.core.display.Javascript object>

# 5. Update the STAC

In [15]:
# update STAC

# Get initial STAC
collection = Collection.from_file(os.path.join(local_STAC, "current/collection.json"))
# collection.describe()  # display hierarchy

# Get template and set items
templatedataset = collection.get_child(template)
dataset = templatedataset.full_copy()
dataset.id = datasetid
dataset.title = variable
dataset.description = variable

# Drop existing items, dimensions and summaries
dataset._resolved_objects
dataset.set_root(None)
dataset.clear_items()
dataset.assets = {}
dataset.extra_fields = deepcopy(
    dataset.extra_fields
)  # workaround for https://github.com/stac-utils/pystac/issues/787
dataset.summaries = None
dataset.extra_fields.pop("cube:dimensions", None)
dataset.extra_fields.pop("cube:variables", None)
dataset.extra_fields.pop("summaries", None)

# Add zarr asset
dataset.add_asset("data", generate.gen_zarr_asset(variable, zarr_fn))

# Add dimension info
dc_ext = DatacubeExtension.ext(dataset)
dc_ext.apply(cube_dimensions)

var = Variable({})
var.description = ""
var.dimensions = list(cube_dimensions.keys())
var.type = "data"
var.unit = nddata["%s" % variable].attrs["units"]
dc_ext.variables = {variable: var}

# Add summaries
dataset.summaries = Summaries(summaries=dimvals)

# Add children
layout = generate.Layout()
for values in product(*dimvals.values()):
    # TODO Improve key gen and align with geojson generation
    key = "-".join(map(lambda x: "-".join(x), zip(dimvals.keys(), map(str, values))))
    feature = generate.gen_default_item(f"{variable}-mapbox-{key}")
    feature.add_asset("mapbox", generate.gen_mapbox_asset(mapbox_url, dataset_out_file))
    feature.properties = generate.gen_default_props(key=key)
    for (k, v) in zip(dimvals.keys(), values):
        feature.properties[k] = v
    dataset.add_item(feature, strategy=layout)


# Set extra link properties
generate.extend_links(dataset, cube_dimensions.keys())

# Save and limit number of folders
collection.add_child(dataset)
dataset.normalize_hrefs(
    os.path.join(local_STAC, f"current/{variable}"), strategy=layout
)
collection.save(
    catalog_type=CatalogType.SELF_CONTAINED,
    dest_href=os.path.join(local_STAC, f"current"),
    stac_io=generate.IO(),
)

<IPython.core.display.Javascript object>