## Onboard River Flood Hazard Maps from Joint Research Center (JRC) to OS-C S3 bucket

The data is flood depth historical return period data and can be found at [JRC data catalog](https://data.jrc.ec.europa.eu/dataset/1d128b6c-a4ee-4858-9e34-6210707f3c81). The methodology is detailed at ["A new dataset of river flood hazard maps for Europe and the Mediterranean Basin" by  Francesco Dottori, Lorenzo Alfieri, Alessandra Bianchi, Jon Skoien, and Peter Salamon](https://essd.copernicus.org/articles/14/1549/2022/).

The provide six different return periods: 10, 20, 50, 100, 200 and 500 years.

The resolution is 100m.

## Create Zarr from shape and Affine transformation

<span style="color:blue">Note: this file must be located in /hazard/src/ for the dependencies to work</span>

In [1]:
import sys
import os
import s3fs
import zarr
import numpy as np
import rasterio
import math
import xarray as xr

from pyproj.crs import CRS
from affine import Affine

from hazard.sources.osc_zarr import OscZarr



In [2]:
# https://console-openshift-console.apps.odh-cl1.apps.os-climate.org/k8s/ns/sandbox/secrets/physrisk-dev-s3-keys
# Hazard indicators bucket
default_staging_bucket = 'physrisk-hazard-indicators-dev01'
prefix = 'hazard'

# Acess key and secret key are stored as env vars OSC_S3_HI_ACCESS_KEY and OSC_S3_HI_SECRET_KEY, resp.
s3 = s3fs.S3FileSystem(anon=False, key=os.environ["OSC_S3_HIdev01_ACCESS_KEY"], secret=os.environ["OSC_S3_HIdev01_SECRET_KEY"])

# Define zarr group
zarr_storage = 'hazard.zarr'
group_path = os.path.join(default_staging_bucket, prefix, zarr_storage).replace('\\','/')
store = s3fs.S3Map(root=group_path, s3=s3, check=False)
root = zarr.group(store=store, overwrite=False) 

# zarr_ storage tree
root.tree()

Tree(nodes=(Node(disabled=True, name='/', nodes=(Node(disabled=True, name='physrisk-hazard-indicators-dev01', …

In [4]:
# List folder files
s3.ls(os.path.join(default_staging_bucket, prefix).replace('\\','/'))

['physrisk-hazard-indicators-dev01/hazard/hazard.zarr',
 'physrisk-hazard-indicators-dev01/hazard/riverflood_JRC_RP_hist.zarr']

In [5]:
# Create OscZarr object to interact with the bucket.
oscZ = OscZarr(bucket=default_staging_bucket,
        prefix=prefix,
        s3=s3,
        store=store)

In [6]:
# Path to the tif files. There is one tif file per return period

base_path_hazard = os.path.join(os.getenv("physical_risk_database"), 'hazard')

hazard_type = 'Flood'
datasource = 'JRC'

inputfile_path = os.path.join(base_path_hazard, hazard_type, datasource)

In [7]:
# Read one tif file to get the metadata: transform, crs, width, height and shape

return_period = '010'
data_filename = 'floodmap_EFAS_RP{}_C.tif'.format(return_period, return_period)
inputfile = os.path.join(inputfile_path, data_filename)

src = rasterio.open(inputfile)

transform = src.transform
crs = CRS.from_epsg(3035)
width = src.width
height = src.height
shape = (height, width)

return_periods_str = ['010', '020', '050', '100', '200', '500']
return_periods = [int(rt) for rt in return_periods_str]

src.close()

In [8]:
# Create data file inside zarr group with name dataset_name

# Name standard is: hazard_type + _ + hazard_subtype (if exists) + '_' + hist or scenario + '_' RP (return period) or event/ emulated + '_' + data_provider
dataset_name = 'flood_river_hist_RP_JRC'
group_path_array = os.path.join(group_path, dataset_name)
oscZ._zarr_create(path=group_path_array,
                  shape = shape,
                  transform = transform,
                  crs = str(crs),
                  overwrite=False,
                  return_periods=return_periods)

<zarr.core.Array '/physrisk-hazard-indicators-dev01/hazard/hazard.zarr/flood_river_hist_RP_JRC' (6, 45242, 63976) float32>

In [9]:
z = oscZ.root[group_path_array]
z.info

0,1
Name,/physrisk-hazard-indicators-dev01/hazard/hazard.zarr/flood_river_hist_RP_JRC
Type,zarr.core.Array
Data type,float32
Shape,"(6, 45242, 63976)"
Chunk shape,"(6, 1000, 1000)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.FSStore
No. bytes,69465652608 (64.7G)


## Steps to populate hazard.zarr/flood_river_hist_RP_JRC

### Step 1: Read tif files

In [10]:
def read_window(src, height_pos, width_pos, chunck_size):
    """
    Read JRC data.

    Parameters:
        path_to_file (str): full path to tif file.

    Returns:
        fld_depth (numpy array): flood depth at (x1, y1) 3035 EPSG coordinates

    """
    window = rasterio.windows.Window(width_pos, height_pos, chunck_size, chunck_size)
    band = src.read(1, window=window)

    to_impute = band == src.nodata
    band[to_impute] = 0

    return band

### Step 2: Populate the raster file for every return period

In [18]:
chunck_size = 1000

for rt_i, rt in enumerate(return_periods_str):

    data_filename = 'floodmap_EFAS_RP{}_C.tif'.format(rt, rt)
    inputfile = os.path.join(inputfile_path, data_filename)

    src = rasterio.open(inputfile)

    #da.data[rt_i,:,:] = fld_depth
    for height_pos in range(0, height, chunck_size):
        for width_pos in range(0, width, chunck_size):

            band = read_window(src, height_pos, width_pos, chunck_size)

            z[rt_i,height_pos:height_pos+chunck_size, width_pos:width_pos+chunck_size] = band

In [12]:
rt_i = 0
rt = '010'
chunck_size = 1000
data_filename = 'floodmap_EFAS_RP{}_C.tif'.format(rt, rt)
inputfile = os.path.join(inputfile_path, data_filename)

src = rasterio.open(inputfile)

height_pos = 0
width_pos = 0

band = read_window(src, height_pos, width_pos, chunck_size)
z[rt_i,height_pos:height_pos+chunck_size, width_pos:width_pos+chunck_size] = band

In [31]:
size = 3
a = list(range(23))
for pos in range(0, len(a), size):
    print(pos, size, a[pos:pos+size])

0 3 [0, 1, 2]
3 3 [3, 4, 5]
6 3 [6, 7, 8]
9 3 [9, 10, 11]
12 3 [12, 13, 14]
15 3 [15, 16, 17]
18 3 [18, 19, 20]
21 3 [21, 22]


In [33]:
a[21:25]

[21, 22]

In [29]:
z[0,0,0] == src.nodata

True

In [48]:
fld_depth.shape

(1, 45242, 63976)

In [27]:
oscZ.write(path = group_path,
           da = da)

In [None]:
# Example using root object. Better to use oscZ object

"""
create_dataset(name, **kwargs) method of zarr.hierarchy.Group instance
    Create an array.
    
    Arrays are known as "datasets" in HDF5 terminology. For compatibility
    with h5py, Zarr groups also implement the require_dataset() method.
    
    Parameters
    ----------
    name : string
        Array name.
    data : array-like, optional
        Initial data.
    shape : int or tuple of ints
        Array shape.
    chunks : int or tuple of ints, optional
        Chunk shape. If not provided, will be guessed from `shape` and
        `dtype`.
    dtype : string or dtype, optional
        NumPy dtype.
    compressor : Codec, optional
        Primary compressor.
    fill_value : object
        Default value to use for uninitialized portions of the array.



root.create_dataset(name='prueba',
                    data = np.array([[0,1], [1,6]]),
                    shape = (2,2),
                    chunks = (1000, 1000),
                    dtype = 'f4')

trans_members = [
    transform.a,
    transform.b,
    transform.c,
    transform.d,
    transform.e,
    transform.f,
]
mat3x3 = [x * 1.0 for x in trans_members] + [0.0, 0.0, 1.0] # Why adding this ??
root.attrs["crs"] = str(crs)
root.attrs["transform_mat3x3"] = mat3x3 
if return_periods is not None:
    root.attrs["index_values"] = return_periods
    root.attrs["index_name"] = "return period (years)"

# Read the file
root['prueba']
"""

In [None]:
# Code to remove a file inside a bucket

""""
import boto3
boto_c = boto3.client('s3', aws_access_key_id=os.environ["OSC_S3_ACCESS_KEY"], aws_secret_access_key=os.environ["OSC_S3_SECRET_KEY"])

to_remove = boto_c.list_objects_v2(Bucket=default_staging_bucket, Prefix='hazard/hazard_MV_prueba.zarr')['Contents']

keys = [item['Key'] for item in to_remove]

for key_ in keys:
    boto_c.delete_object(Bucket=default_staging_bucket, Key=key_)
"""