# Extending Caravan to new basins

Author: Frederik Kratzert
Contact: kratzert@google.com

This notebook is part of the [Caravan publication](https://www.nature.com/articles/s41597-023-01975-w) and is the first of two notebooks that can be used to extend the dataset to new basins. This notebook is intended to be run on Google Colab so that the data exports to Earth Engine works as intended.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.sandbox.google.com/github/kratzert/Caravan/blob/main/code/Caravan_part1_Earth_Engine.ipynb)

## What is Caravan?

Caravan is ...
- an open source, large-sample rainfall-runoff dataset.
- derived entirely in the cloud (Earth Engine).
- a dataset that only uses globally available data products (namely [ECMWF's ERA5-Land](https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview) and [HydroATLAS](https://www.hydrosheds.org/page/hydroatlas) by Linke et al.).

Why?
- to democratize large-sample hydrology: Anyone can extend the dataset to new regions without downloading and preprocessing large amounts of raw data. Instead, we use freely available cloud resources to do all heavy lifting in the cloud and only download the processed results.
- to advance research on global rainfall-runoff models: So far, we have seen  different large-sample datasets (e.g. CAMELS variants) that are specific to individual countries. However, due to the use of data products with only local coverage (e.g. meteorological forcing data or static attribute maps), intercomparisons betweens these products are not straightforward.

## Requirements

- You need a Google account to be able to use Earth Engine
- You need to add your shapefile as an [Asset](https://developers.google.com/earth-engine/guides/table_upload#upload-a-shapefile) to Google Earth Engine
- The shapefile needs to have one field that indicates the basin/gauge ID, which will be used to link the derived data to the individual basin. Make sure to adapt the corresponding variable in the "General configuration" section below. It is recommend to name this field `gauge_id`, which will safe you some adaptations across the 2nd notebook.

Note: This notebook only derives catchment attributes and meteorological forcing data for each polygon in the shapefile layer. We do not provide additional streamflow data beyond what is already included in Caravan.

## Making new data available to the community

We envision Caravan as a dynamically growing dataset that can be extended by its users. While making a large-sample dataset available upon the intial publication, the distribution of the basins is still limited to a few regions in the world. Ideally, this will change over time. Anyone who uses Caravan and this code to extend the dataset is invited to also share the streamflow time series data with the community, even if it is just for a single (or a few) basins. By doing so, little by little, we could create a new, globally spanning open source dataset that anyone can use for their own work/research. For the moment, the [discussion forum](https://github.com/kratzert/caravan/discussions) on the Github repository is the place to share information on data that you want to contribute. See also the [wiki](https://github.com/kratzert/caravan/wiki) on Github for further information.

##  Structure of this notebook

The code consists of two different sections:

1. To derive static catchment attributes from HydroATLAS.
2. To spatially aggregate ERA5-Land forcing data for each polygon.

While the first point takes a couple of seconds up to a few minutes, depending on the number of basins, the second point takes considerably longer. However, within this notebook we only start jobs to process ERA5-Land on Earth Engine and to store the data in Google Drive. The tasks will run in the background and this notebook can be closed after the end of this notebook is reached. For details, see the introduction in each of the two data processing sections.

## Citation

If you use the Caravan dataset or this code, please consider referencing the original publication

```bib
@article{kratzert2023caravan,
  title={Caravan-A global community dataset for large-sample hydrology},
  author={Kratzert, Frederik and Nearing, Grey and Addor, Nans and Erickson, Tyler and Gauch, Martin and Gilon, Oren and Gudmundsson, Lukas and Hassidim, Avinatan and Klotz, Daniel and Nevo, Sella and others},
  journal={Scientific Data},
  volume={10},
  number={1},
  pages={61},
  year={2023},
  publisher={Nature Publishing Group UK London}
}
```

# General Configurations

- `ASSET_NAME`: String defining the path of the asset in Google Earth Engine. Usually something like `users/{username}/{asset_name}`.

- `OUTPUT_FOLDER_NAME`: The name of the folder (in Google Drive) that is used to store the resulting data. For example, `OUTPUT_FOLDER_NAME = 'my_caravan_extension'` will create a folder called `my_caravan_extension` in the main directory of your Google Drive.

- `BASIN_ID_FIELD`: Name of the attribute field in the shapefile that contains the basin id. It is recommended to name this field `gauge_id`. Make sure that this name does not conflict with any of the field names of HydroATLAS. For example, you could use something like `'gauge_id'` or `'basin_id'` but _not_ `'HYBAS_ID'`, `'PFAF_ID'`, or `'MAIN_BAS'`, which are all existing field names in HydroATLAS/HydroSHEDS.

- `BASIN_PREFIX`: A short descriptive string that is prepended to each basin id and that should be unique within the Caravan data space. For example, we use `camels` for basins from the CAMELS (US) dataset and `camelsgb` from the CAMELS-GB dataset. The final name for each basin within the attribute table will be `{BASIN_PREFIX}_{GAUGE_ID}`. Note, if you already included such a prefix in the basin id field of your shapefile, leave this field as an empty string. Please also read the README in the Caravan dataset folder on details about the folder structure of the dataset.

- `AREA_MAX_THRESHOLD` (in km2): Defines the upper bound for the basin area. Larger basins in the shapefile are ignored when deriving the attributes and forcings. If `None`, all basins are considered. Note: Currently it is possible to get an error when trying to process to large polygons, due to surpassing the Earth Engine user quota. We are working on a fix for that.

- `AREA_MIN_THRESHOLD` (in km2): Defines the lower bound for the basin area. Smaller basins in the shapefile are ignored when deriving the attributes and forcings. If `None`, all basins are considered. In the Caravan dataset, we used a threshold of 100km2. The reason for a lower bound has to do with the underlying resolution of HydroATLAS and some issues with the definition of some of the attributes. That being said, you can still consider smaller basins but treat their values (especially the attributes) with care.

In [2]:
ASSET_NAME = 'projects/ee-tedlanghorst/assets/global_wq/upstream_basin_polygons'
OUTPUT_FOLDER_NAME = 'camels_global_wq'
BASIN_ID_FIELD = 'gauge_id'
BASIN_PREFIX = 'globalwq'
AREA_MAX_THRESHOLD = None
AREA_MIN_THRESHOLD = None

# Import modules

Note: to be able to store data in Google Drive, you need to mount your Drive in this runtime. Follow the instructions of the pop-up window, otherwise you won't be able to store the HydroATLAS attributes or ERA5-Land data.

Also note: You also need to give this runtime permission to access Earth Engine, follow the instructions in the text and paste the token from the link within the box below, then press Enter. For a more detailed description you can also check our [guide](https://github.com/kratzert/Caravan/wiki/Extending-Caravan-with-new-basins).

In [3]:
from collections import defaultdict
from pathlib import Path

import ee
import numpy as np
import pandas as pd
from google.colab import drive
from tqdm.notebook import tqdm

# Mount Google drive to colab runtime.
drive.mount('/content/drive')
drive_path = Path(f'/content/drive/My Drive/')

# Authorize this Colab to access Earth Engine.
ee.Authenticate()
ee.Initialize()

Mounted at /content/drive
To authorize access needed by Earth Engine, open the following URL in a web browser and follow the instructions. If the web browser does not start automatically, please manually browse the URL below.

    https://code.earthengine.google.com/client-auth?scopes=https%3A//www.googleapis.com/auth/earthengine%20https%3A//www.googleapis.com/auth/devstorage.full_control&request_id=-MZTI9o7LQruRTmYIBVp_x2KAng6n589ZYXAuDvcGPk&tc=MuLE61VVNqk7klkAuBoEyQzWsj7SLd8iWtfwccbRlcc&cc=o4ceD-hXg8NH-BH3vuWGPhZXy-j58rMmMZZSlS3ruO4

The authorization workflow will generate a code, which you should paste in the box below.
Enter verification code: 4/1AfJohXlJVFjmRNrx5xpnViD6Bg2u4_7xg899x1jFj1qfl_OnzJz4SbqGRT0

Successfully saved authorization token.


# Utility functions

A set of functions that are used below. Simply execute this block to make the functions available.

In [5]:
def add_area_as_property(poly):
    """Add polygon area (km2) as property to each feature.

    Different shapefiles might have different property names and some are lacking
    the area property all together. We use this function to have a homogeneous
    area property we can later use for filtering catchments by the area.
    """
    new_poly = ee.Feature(poly).set({'area': poly.area().divide(1000*1000)})
    return new_poly


def subset_properties(feat, property):
    """Remove properties from features.

    This function is used to remove unnecessary properties from the polygon files
    to reduce the output file size.
    """
    properties = feat.propertyNames()
    selected_properties = properties.filter(ee.Filter.eq('item', property))
    return feat.select(selected_properties)

def join_features(poly):
    """Combine two polygons and add intersecting area as property."""
    primary = ee.Feature(poly.get('primary'))
    secondary = ee.Feature(poly.get('secondary'))

    # perform intersection (creates new object)
    newPoly = primary.intersection(secondary)

    # get area of overlap
    area = newPoly.area().divide(1000*1000)

    # copy properties of both initial polygons in new object
    newPoly = newPoly.copyProperties(primary).copyProperties(secondary)

    # return with the area over intersection added as property
    return ee.Feature(newPoly).set({'Intersect': area}).setGeometry(None)

def start_export_task_img_coll(img_coll, feature_coll, folder, filename):
    """Start Export task on Earth Engine.

    This function is used to send one processing and exporting task for ERA5-Land
    data to Earth Engine.
    """
    func = lambda img: img.reduceRegions(collection=feature_coll,
                                         reducer=ee.Reducer.mean(),
                                         scale=5000)
    results_table = img_coll.map(func)
    new_table = results_table.flatten().map(lambda x: x.setGeometry(None))

    task = ee.batch.Export.table.toDrive(
        collection=new_table,
        folder=folder,
        description=f"{filename}_era5land",
        fileNamePrefix=filename
    )
    task.start()
    return task


def compute_pour_point_properties(basin_data, min_overlap_threshold, pour_point_properties):
    """Finds most downstream polygon to extract pour point features.

    Some features are only defined for the entire upstream area. Here, we need to find the
    most downstream HydroATLAS polygon that still has a significant ( > 50%) overlap with the
    catchment polygon and take the attributes from there. In case of non-significant overlap
    of the most downstream polygon, it is possible that two (or more) merging rivers need to be
    considered that all drain into that most downstream polygon. If that is the case, we take the
    sum of the pour-point properties (as all of these properties are volumnes, counts, areas etc.)
    """
    # Find basin with the largest overlap (in percent).
    percentage_overlap = [x / y for x, y in zip(basin_data['weights'], basin_data["SUB_AREA"])]
    current_basin_pos = np.argmax(percentage_overlap)

    # Get the HYBAS_ID of the next downstream gauge.
    next_down_id = basin_data['NEXT_DOWN'][current_basin_pos]

    # Traverse the river network downstream until we hit the termination condition.
    while True:

        # Make sure that we did not reached the ocean (HYBAS_ID=0), otherwise we would later pick all intersecting
        # polygons that also drain into HYBAS_ID = 0.
        if next_down_id == 0:
            break

        # Check if next_down_id is in the list of HYBAS_IDs.
        if next_down_id not in basin_data['HYBAS_ID']:
            break

        # Get position of next_down_id in the list of HYBAS_IDs
        next_down_pos = basin_data['HYBAS_ID'].index(next_down_id)

        # Check that the intersection of the next downstream polygon is at least 50%
        if percentage_overlap[next_down_pos] < 0.5:
            break

        # Continue with the next polygon.
        next_down_id = basin_data['NEXT_DOWN'][next_down_pos]

    # Find all polygons that would have drained into that polygons, which can potentially be more than one in case where
    # two (or more) rivers merge.
    direct_upstream_polygons = []
    for i, next_down in enumerate(basin_data['NEXT_DOWN']):
        if ((next_down == next_down_id) and
         ((basin_data['weights'][i] > min_overlap_threshold) or
          (basin_data["weights"][i] / basin_data["SUB_AREA"][i] > 0.5))):
            direct_upstream_polygons.append(i)

    # Finally compute metrics. As all pour_point_properties are volumes or areas, we can simply compute the sum.
    aggregated_properties = {}
    for prop in pour_point_properties:
        aggregated_properties[prop] = sum([basin_data[prop][i] for i in direct_upstream_polygons])

    return aggregated_properties

# Processing HydroATLAS

This section of the code derives the selected attributes from HydroATLAS that are considered in Caravan for every polygon in the shape layer.

## TLDR; How are the attributes derived?

- We use HydroATLAS level 12, i.e. the layer with the highest available resolution.
- Next, we take the polygons of the catchment boundaries and for each polygon, we compute the intersecting polygon areas of the HydroATLAS dataset.
- With the resulting clipped polygons, we derive the attributes as aggregates (see the "HydroATLAS features" sub-section for more details on the aggregation procedure).

## Output

After completing this part of the code, you will find an `attributes_new.csv` file in your predefined Google Drive folder.

## Configurations for HydroATLAS

- `MIN_OVERLAP_THRESHOLD` (in km2): Because of differences in the basin delinations between HydroSHEDS and the user-provided catchment boundaries, the outer border of a catchment does not necessarily perfectly match the boundaries of a HydroSHEDS polygon. This can lead to some artifacts during the intersection process, where small pieces of a neighboring HydroSHEDS polygon are included. We use the value of this variable to filter these small artifacts and only consider overlapping polygons that have an overlapping area that is greater than MIN_OVERLAP_THRESHOLD. However, this means that this variable also marks a minimum catchment size that can be processed with this code.

- `BATCH_SIZE`: This value defines the number of basins that are processed in one call to the Earth Engine API. Generally, a higher value is better, since Earth Engine is extremely good in parallelizing jobs, but there is an upper limit of how many objects can be processed at once. A value of 50 was a reasonable tradeoff and seemed to not yield any problems. In case you get an error below when processing your basins (something like "EEException: Collection query aborted after accumulating over 5000 elements."), try reducing the batch size.

In [6]:
MIN_OVERLAP_THRESHOLD = 5
BATCH_SIZE = 1

## Initialize Feature collections

The next section initializes the two different feature collections (i.e., shapefiles) in Earth Engine. One is the user-provided polygon layer and the other is HydroATLAS level 12. From the user-provided polygon layer, only those polygons with a catchment area smaller than the threshold are considered for every downstream task.

In [7]:
# initialize featurecollection from uploaded asset
basins = ee.FeatureCollection(ASSET_NAME)

# Add area as polygon property and filter using the threshold defined above
basins = basins.map(add_area_as_property)
if AREA_MIN_THRESHOLD is not None:
    basins = basins.filterMetadata('area', 'greater_than', AREA_MIN_THRESHOLD)
if AREA_MAX_THRESHOLD is not None:
    basins = basins.filterMetadata('area', 'not_greater_than', AREA_MAX_THRESHOLD)

# Instantiate HydroATLAS feature collection
hydro_atlas = ee.FeatureCollection("WWF/HydroATLAS/v1/Basins/level09")

# Print number of basins that will be processed
n_basins = basins.size().getInfo()
print(f"Number of basins in feature collection and within an area bounds: {n_basins}")

Number of basins in feature collection and within an area bounds: 1812


## Filter list of HydroATLAS attributes

The following cell defines which features are considered and which are ignored. Further, we define how the HydroATLAS attributes are aggregated, as there are often multiple HydroATLAS polygons within one catchment polygon due to the relatively high resolution of the level 12 layer. For discrete classes, we take the area-weighted majority vote, i.e., the class that is represented the most across all intersecting HydroATLAS polygons weighted by the area. For most of the other attributes which are defined per sub-polygon in HydroATLAS, we consider the polygon values and compute the area-weighted average. Some attributes are only defined for the entire upstream catchment. For these, we take the `UP_AREA` (upstream area) property to identify the most downstream HydroATLAS polygon and then take the attribute from here.



In [26]:
# get list list of all available hydroatlas feature properties
property_names = hydro_atlas.first().propertyNames().getInfo()

print(f"Found {len(property_names)} features initially")

# List of features for which we take the area-weighted majority vote
majority_properties = ['clz_cl_smj', # climate zones (18 classes)
                       'cls_cl_smj', # climate strata (125 classes)
                       'glc_cl_smj', # land cover (22 classes)
                       'pnv_cl_smj', # potential natural vegetation (15 classes)
                       'wet_cl_smj', # wetland (12 classes)
                       'tbi_cl_smj', # terrestrial biomes (14 classes)
                       'tec_cl_smj', # Terrestrial Ecoregions (846 classes)
                       'fmh_cl_smj', # Freshwater Major Habitat Types (13 classes)
                       'fec_cl_smj', # Freshwater Ecoregions (426 classes)
                       'lit_cl_smj', # Lithological classes (16 classes)
                      ]

# List of features for which we take the value of the most downstream polygon
pour_point_properties = ['dis_m3_pmn', # natural discharge annual mean
                         'dis_m3_pmx', # natural discharge annual max
                         'dis_m3_pyr', # natural discharge annual min
                         'lkv_mc_usu', # Lake Volume
                         'rev_mc_usu', # reservoir volume
                         'ria_ha_usu', # River area
                         'riv_tc_usu', # River volumne
                         'pop_ct_usu', # Population count in upstream area
                         'dor_pc_pva', # Degree of regulation in upstream area
                        ]

# These HydroSHEDS/RIVERS features are ignored
ignore_properties = ['system:index',
                     'COAST',
                     'DIST_MAIN',
                     'DIST_SINK',
                     'ENDO',
                     'MAIN_BAS',
                     'NEXT_SINK',
                     'ORDER_',
                     'PFAF_ID',
                     'SORT',
                     ]

# These features are required to find the most downstream gauge and will be removed later.
additional_properties = ['HYBAS_ID',
                         'NEXT_DOWN',
                         'SUB_AREA',
                         'UP_AREA'
                        ]

# These HydroATLAS features are ignored, mostly because they have a per-polygon
# counterpart that we include.
upstream_properties = ['aet_mm_uyr', # Actual evapotranspiration
                       'ari_ix_uav', # Global aridity index
                       'cly_pc_uav', # clay fraction soil
                       'cmi_ix_uyr', # Climate Moisture Index
                       'crp_pc_use', # Cropland Extent
                       'ele_mt_uav', # Elevtion
                       'ero_kh_uav', # Soil erosion
                       'for_pc_use', # Forest cover extent
                       'gdp_ud_usu', # Gross Domestic Product
                       'gla_pc_use', # Glacier Extent
                       'glc_pc_u01', # Land cover extent percent per class (22)
                       'glc_pc_u02',
                       'glc_pc_u03',
                       'glc_pc_u04',
                       'glc_pc_u05',
                       'glc_pc_u06',
                       'glc_pc_u07',
                       'glc_pc_u08',
                       'glc_pc_u09',
                       'glc_pc_u10',
                       'glc_pc_u11',
                       'glc_pc_u12',
                       'glc_pc_u13',
                       'glc_pc_u14',
                       'glc_pc_u15',
                       'glc_pc_u16',
                       'glc_pc_u17',
                       'glc_pc_u18',
                       'glc_pc_u19',
                       'glc_pc_u20',
                       'glc_pc_u21',
                       'glc_pc_u22',
                       'hft_ix_u09', # Human Footprint 2009
                       'hft_ix_u93', # Human Footprint 1993
                       'inu_pc_ult', # inundation extent long-term maximum
                       'inu_pc_umn', # inundation extent annual minimum
                       'inu_pc_umx', # inundation extent annual maximum
                       'ire_pc_use', # Irrigated Area Extent (Equipped)
                       'kar_pc_use', # Karst Area Extent
                       'lka_pc_use', # Limnicity (Percent Lake Area)
                       'nli_ix_uav', # Nighttime Lights
                       'pac_pc_use', # Protected Area Extent
                       'pet_mm_uyr', # Potential evapotranspiration
                       'pnv_pc_u01', # potential natural vegetation (15 classes)
                       'pnv_pc_u02',
                       'pnv_pc_u03',
                       'pnv_pc_u04',
                       'pnv_pc_u05',
                       'pnv_pc_u06',
                       'pnv_pc_u07',
                       'pnv_pc_u08',
                       'pnv_pc_u09',
                       'pnv_pc_u10',
                       'pnv_pc_u11',
                       'pnv_pc_u12',
                       'pnv_pc_u13',
                       'pnv_pc_u14',
                       'pnv_pc_u15',
                       'pop_ct_ssu', # population count
                       'ppd_pk_uav', # population density
                       'pre_mm_uyr', # precipitation
                       'prm_pc_use', # Permafrost extent
                       'pst_pc_use', # Pasture extent
                       'ria_ha_ssu', # river area in sub polygon
                       'riv_tc_ssu', # River volumne in sub polygon
                       'rdd_mk_uav', # Road density
                       'slp_dg_uav', # slope degree
                       'slt_pc_uav', # silt fraction
                       'snd_pc_uav', # sand fraction
                       'snw_pc_uyr', # snow cover percent
                       'soc_th_uav', # organic carbon content in soil
                       'swc_pc_uyr', # soil water content
                       'tmp_dc_uyr', # air temperature
                       'urb_pc_use', # urban extent
                       'wet_pc_u01', # wetland classes percent (9 classes)
                       'wet_pc_u02',
                       'wet_pc_u03',
                       'wet_pc_u04',
                       'wet_pc_u05',
                       'wet_pc_u06',
                       'wet_pc_u07',
                       'wet_pc_u08',
                       'wet_pc_u09',
                       'wet_pc_ug1', #  wetland classes percent by class grouping (2 classes)
                       'wet_pc_ug2',
                       'gad_id_smj', # global administrative areas (country id's)
                       ]

# get the final list of features to use (UP_AREA is only used for processing and deleted later
# so we subtract one.)
use_properties = [p for p in property_names if p not in ignore_properties + upstream_properties]

print(f"Number of ignored properties: {len(ignore_properties) + len(upstream_properties) + 1} ")
print(f"--Ignored {len(ignore_properties) + 1} HydroRIVER properties")
print(f"--Ignored {len(upstream_properties)} upstream basin properties")
print(f"Remaining number of features: {len(use_properties) - 1 - len(additional_properties)}")

Found 295 features initially
Number of ignored properties: 97 
--Ignored 11 HydroRIVER properties
--Ignored 86 upstream basin properties
Remaining number of features: 194


## Perform Intersection and download data

The following piece of code queries Earth Engine to perform the intersection between the user-provided polygons and HydroATLAS. The results are unpacked into a nested dictionary. Each basin within this dictionary is another dictionary with one key per attribute and the corresponding value is a list of attribute values of all intersecting HydroATLAS polygons. Additionally, we store the area of the intersection between an individual HydroATLAS polygon and the user-provided polygons in a list to use them as aggregation weights in the next step.

In [8]:
# create Filterobject for the intersection
spatialFilter = ee.Filter.intersects(
    leftField='.geo',
    rightField='.geo',
    maxError=10
)

# create join object
join = ee.Join.inner()

# list of all available basins
basin_ids = basins.aggregate_array(BASIN_ID_FIELD).getInfo()
missing_basins = []

# dictionary to store the results
results = {b: defaultdict(list) for b in basin_ids}

# get number of batch requests to make
n_batches = n_basins // BATCH_SIZE
if n_basins % BATCH_SIZE > 0:
    n_batches += 1

# iterate over batches
for batch in tqdm(range(n_batches)):
    start_idx = batch * BATCH_SIZE
    if (batch + 1) * BATCH_SIZE > n_basins:
        process_n_basins = n_basins - start_idx
    else:
        process_n_basins = BATCH_SIZE

    # perform intersection
    intersectJoined = join.apply(ee.FeatureCollection(basins.toList(process_n_basins, start_idx)),
                                 hydro_atlas,
                                 spatialFilter)
    intersected_basins = intersectJoined.map(join_features).getInfo()

    # iterate over intersected polygon and extract the information needed
    for polygon in intersected_basins["features"]:

        basin_id = polygon["properties"][BASIN_ID_FIELD]

        # check if the intersect is larger than the minimum overlap treshold
        if ((polygon["properties"]["Intersect"] > MIN_OVERLAP_THRESHOLD) or
            (polygon["properties"]["Intersect"] / polygon["properties"]["SUB_AREA"] > 0.5)):

            # For some reason, the intersections for some basins are returned twice, leading to
            # errors when computing the aggregates and especially the basin area. Here, we make
            # sure that we do not add an intersect again, by filtering for unique intersect areas.
            if polygon["properties"]["Intersect"] not in results[basin_id]["weights"]:

                for prop in use_properties:
                    results[basin_id][prop].append(polygon["properties"][prop])

                # extract area of intersection, used as weight
                results[basin_id]["weights"].append(polygon["properties"]["Intersect"])

        # To compute the true basin area (that is similar to the area of the basin polygon), we need
        # the area of all intersecting HydroATLAS polygons, even the small bits that we exclude when
        # we aggregate the attributes.
        if polygon["properties"]["Intersect"] not in results[basin_id]["area_fragments"]:
            results[basin_id]["area_fragments"].append(polygon["properties"]["Intersect"])

  0%|          | 0/1812 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

In [9]:
print(basins)

ee.FeatureCollection({
  "functionInvocationValue": {
    "functionName": "Collection.map",
    "arguments": {
      "baseAlgorithm": {
        "functionDefinitionValue": {
          "argumentNames": [
            "_MAPPING_VAR_0_0"
          ],
          "body": {
            "functionInvocationValue": {
              "functionName": "Element.set",
              "arguments": {
                "key": {
                  "constantValue": "area"
                },
                "object": {
                  "argumentReference": "_MAPPING_VAR_0_0"
                },
                "value": {
                  "functionInvocationValue": {
                    "functionName": "Number.divide",
                    "arguments": {
                      "left": {
                        "functionInvocationValue": {
                          "functionName": "Feature.area",
                          "arguments": {
                            "feature": {
                              "argumentRe

## Perform aggregation of the intersection results

The next block of code performs the aggregation of the downloaded results.
If the attribute is not listed under `majority_properties` or `pour_point_properties`, we compute the area-weighted average.

In [20]:
aggregated_results = defaultdict(dict)
for basin, basin_data in tqdm(results.items()):
    # extract weights for the weighted aggregation of the attributes
    weights = np.array(basin_data["weights"])

    # compute the mask of intersections with a sufficient overlap
    mask = weights > MIN_OVERLAP_THRESHOLD
    masked_weights = weights[mask]

    # perform aggregation
    for key, val in basin_data.items():
        # Ignore these auxilary properties
        if key in ["weights", "UP_AREA", "area_fragments", "HYBAS_ID", "NEXT_DOWN", "SUB_AREA"]:
            continue

        # Pour-point properties are treated separately below.
        if key in pour_point_properties:
            continue

        val = np.array(val)

        # only consider intersections with sufficient overlap
        masked_val = val[mask]

        # for wetland classes, define 'no wetland' as a new class.
        # In the dataset this value is -999, here we set it to 13
        if key == 'wet_cl_smj':
            masked_val[masked_val == -999] = 13

        # if all values are -999, set value to NaN
        if len(masked_val[masked_val == -999]) == len(masked_val):
            aggregated_results[basin][key] = np.nan
        else:
            # Majority vote for the class properties
            if key in majority_properties:
                aggregated_results[basin][key] = np.bincount(masked_val[masked_val > -999],
                                                             weights=masked_weights[masked_val > -999]).argmax()

            # Averaging for all remaining properties
            else:
                aggregated_results[basin][key] = np.average(masked_val[masked_val > -999],
                                                            weights=masked_weights[masked_val > -999])

    aggregated_pour_point_features = compute_pour_point_properties(
        basin_data=basin_data,
        min_overlap_threshold=MIN_OVERLAP_THRESHOLD,
        pour_point_properties=pour_point_properties)
    for key, val in aggregated_pour_point_features.items():
        aggregated_results[basin][key] = val

    # basin area is the sum of the area of all intersecting fragments, also the smaller intersections that are ignored
    # above.
    aggregated_results[basin]['area'] = sum(basin_data["area_fragments"])

    # We also store the fraction of the area considered during aggregation to the total basin area
    aggregated_results[basin]['area_fraction_used_for_aggregation'] = sum(masked_weights) / sum(basin_data["area_fragments"])

dict_items([(7120330800, defaultdict(<class 'list'>, {'area_fragments': [0], 'weights': [], 'SUB_AREA': []})), (7120494640, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7121166400, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120629840, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120519280, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120486560, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120507040, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120740560, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7121023200, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7121101040, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120548080, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120666880, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120634160, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7120478560, defaultdict(<class 'list'>, {'area_fragments': [0]})), (7121

  0%|          | 0/1258 [00:00<?, ?it/s]

7120330800
defaultdict(<class 'list'>, {'area_fragments': [0], 'weights': [], 'SUB_AREA': []})


ValueError: ignored

## Create DataFrame and show results

Here, we convert the dictionary into a DataFrame and check for NaN values within the data.

In [None]:
# store all data as pandas DataFrame
if BASIN_PREFIX:
    df = pd.DataFrame([val for val in aggregated_results.values()],
                      index=[f"{BASIN_PREFIX}_{str(k)}" for k in aggregated_results.keys()])
else:
    df = pd.DataFrame([val for val in aggregated_results.values()],
                       index=[str(k) for k in aggregated_results.keys()])

df.index.name = "gauge_id"
df = df.sort_index(axis=0)

# check for features containing NaN's
nan_columns = df.columns[df.isna().any()].tolist()

if nan_columns:
    print(f"{len(nan_columns)} Columns contain NaNs")
    print(f"These columns contain NaN values {nan_columns}")
else:
    print("No column contains any NaN value")

df.head()

## Save results to Drive

We create an `attributes` folder within the user-defined output path into which the resulting csv file is stored.

In [None]:
# create output folder
attribute_dir = drive_path / OUTPUT_FOLDER_NAME / 'attributes'
if not attribute_dir.is_dir():
    attribute_dir.mkdir(parents=True)

# save dataframe as csv
df.to_csv(attribute_dir / 'attributes.csv')

print("HydroATLAS attributes processed successfully")

# Processing ERA5-Land

This section of the code prepares and starts tasks to process hourly ERA5-Land data for each basin in the shapefile. After this section is completed, you can close this notebook since the tasks were sent to Earth Engine and Earth Engine will store the resulting files in Drive.

**Note**: It might take several hours until you find the processed data in your Google Drive. The time depends on the length of ERA5-Land data that you request, the number of basins in your shape layer, the size of the basins and on the numbers of points that span your polygons. If your basin polygons have a very fine resolution, consider simplifying them to speed up this step, e.g., using the [QGIS simplify function](https://docs.qgis.org/2.8/en/docs/user_manual/processing_algs/qgis/vector_geometry_tools/simplifygeometries.html) (we used the default tolerance of 1.0 for e.g. CAMELS US and CAMELS CL)

## TLDR; What do we do here?

Since most streamflow records are stored in local time but ERA5-Land is provided in UTC, we use ERA5-Land in hourly resolution, even if we are only interested in daily aggregates. By using hourly data, we can shift the forcing data to local time before we compute the daily aggregates and thus make sure that the forcing data aligns with the underlying streamflow data. However, this is part of the second processing step of the meteorological forcing data that runs on your local machine (no worries, this is something that any laptop/PC can do), after we used Earth Engine to compute the spatial averages.

## Output

The output of this part of the code is a number (we split the time into manageable slices) of csv files. Each csv file contains one row of data per basin and timestep. The data are the spatially averaged ERA5-Land variables that we request Earth Engine to process. Note: These files can become large since this is still a lot of data. If you do not have enough free space in your Google Drive, either upgrade your disk space or process the ERA5-Land time period in chunks and download the processed data after every step.

After you have downloaded all files to your local machine, use our other [notebook]() to process the data locally into time series data and into the Caravan native format.


## Configuration for ERA5-Land

- `IMGS_PER_REQUEST`: This defines the number of ERA5-Land images (one image = one hour) that we want to process in one request. Earth Engine is good in parallelizing compute, so a larger number is preferable. However, Earth Engine also has a size limit for a single user request. If you see an error when sending the requests that states that the query is aborted due to too many elements, consider reducing this number.

- `START_DATE`: The first day of the period for which you request data. In Caravan, we include forcing data starting January 1st, 1981.

- `END_DATE`: The last day of the period for which you request data. In Caravan, we include forcing data until December 31st, 2020. Note, that this date refers to data in UTC. To be able to include December 31st, 2020 in the local time zone you might actually need to request one additional day, depending on the location.

 `ERA5L_BANDS`: The list of bands for which you wish to compute the spatial averages. You can change this at will, however to be compatible with Caravan, you should probably leave it as is. The names have to match the names in [this table](https://developers.google.com/earth-engine/datasets/catalog/ECMWF_ERA5_LAND_HOURLY#bands). **Note** The second notebook that processes the output of Earth Engine assumes that these bands are available. If you make changes here, you might need to adapt parts of the second notebook.

 **Note**: Later, in the second notebook, we shift the ERA5-Land data from UTC-0 to local time, before aggregating to daily resolution. Here, we only consider days with values for all 24 hours. Therefore, adapt the `START_DATE` and the `END_DATE` according to the local timezone of your data with a 1 day buffer. That is, if your basins are east of UTC-0, adapt your `START_DATE` to include e.g. the 31st December. If the basins are west of UTC-0, adapt the `END_DATE` to include 1st January. If you want to be on the safe side, simply include a 1 day buffer on both side and then crop the data in the 2nd notebook.

In [None]:
IMGS_PER_REQUEST = 365 * 24
START_DATE = '1981-01-01'  # Format: YYYY-MM-DD
END_DATE = '2021-01-01'  # Format: YYYY-MM-DD

# ------------------------------ Change with caution --------------------------#
ERA5L_BANDS = ['dewpoint_temperature_2m',
               'temperature_2m',
               'volumetric_soil_water_layer_1',
               'volumetric_soil_water_layer_2',
               'volumetric_soil_water_layer_3',
               'volumetric_soil_water_layer_4',
               'surface_net_solar_radiation',
               'surface_net_thermal_radiation',
               'u_component_of_wind_10m',
               'v_component_of_wind_10m',
               'surface_pressure',
               'total_precipitation',
               'snow_depth_water_equivalent',
               'potential_evaporation']

## Initialize feature collections

In [None]:
# initialize featurecollection from uploaded asset
basins = ee.FeatureCollection(ASSET_NAME)

# Add area as polygon property and filter using the threshold defined above
basins = basins.map(add_area_as_property)
if AREA_MIN_THRESHOLD is not None:
    basins = basins.filterMetadata('area', 'greater_than', AREA_MIN_THRESHOLD)
if AREA_MAX_THRESHOLD is not None:
    basins = basins.filterMetadata('area', 'not_greater_than', AREA_MAX_THRESHOLD)

# remove all unnecessary fields from the basin polygons to reduce the export size
basins = basins.map(lambda f: subset_properties(f, BASIN_ID_FIELD))

# initialize featurecollection of ERA5-Land hourly data
era5_collection = ee.ImageCollection("ECMWF/ERA5_LAND/HOURLY").filterDate(START_DATE, END_DATE)

# subset ERA5 bands
era5_collection = era5_collection.select(ERA5L_BANDS)

# Print some meta information
n_basins = basins.size().getInfo()
n_images = era5_collection.size().getInfo()

print(f"Number of basins: {n_basins}")
print(f"Number of hourly ERA5 images: {n_images}")

## Start processing/exporting tasks

This block will create the Earth Engine jobs to process the ERA5-Land data. After this block is completed, you can close this notebook and wait for Earth Engine to store the data in your drive (might take time!). You can see the list of current tasks in the [code editor of Earth Engine](https://code.earthengine.google.com/) (right side under "Tasks") or use the code block below to print the current list of jobs (and their state) in this notebook.

In [None]:
start = 0
batch_number = 1
# how many images to process in one task

n_imgs = IMGS_PER_REQUEST
while start < n_images:
    # clip the number of images to process by the number of total available images
    if (start + n_imgs) > n_images:
        n_imgs = n_images - start

    print(f"Processing {n_imgs} images, starting from index {start}")

    # start export task for 'n_imgs' elements, starting at index 'start'
    test_task = start_export_task_img_coll(
        img_coll=ee.ImageCollection(era5_collection.toList(count=n_imgs,offset=start)),
        feature_coll=basins,
        folder=OUTPUT_FOLDER_NAME,
        filename=f'batch{str(batch_number).zfill(2)}',
    )
    start += n_imgs
    batch_number += 1

print("All jobs were sent to Earth Engine.")

# Check current tasks on Earth Engine

The next section can be used optionally to check the state of the current tasks and can also be run separately of the other code (e.g., if you want to check the state of your ERA5-Land exporting tasks after a couple of hours).

In [None]:
task_prefix = "batch"
def get_task_status(_prefix=None):
    task_status_list = [t.status() for t in ee.batch.Task.list()]

    # If a prefix was specified, use it to filter the task list.
    if _prefix:
        task_status_list = [
            s
            for s in task_status_list
            if s['description'].startswith(_prefix)
        ]
    return task_status_list

df = pd.DataFrame(get_task_status(task_prefix))

# Convert datetime fields.
df['creation_timestamp_ms'] = pd.to_datetime(df['creation_timestamp_ms'], unit='ms', utc=True)
df['start_timestamp_ms'] = pd.to_datetime(df['start_timestamp_ms'], unit='ms', utc=True)
df['update_timestamp_ms'] = pd.to_datetime(df['update_timestamp_ms'], unit='ms', utc=True)

df