# Initialization

In [None]:
# Google Earth Engine packages
import ee
import geemap
import geemap.colormaps as cm

import xarray as xr
import numpy as np


# function to load nc file into GEE
def netcdf_to_ee(filename):
    ds = xr.open_dataset(filename)
    data = ds['z']

    lon_data = np.round(data['lon'], 3)
    lat_data = np.round(data['lat'], 3)

    dim_lon = np.unique(np.ediff1d(lon_data).round(3))
    dim_lat = np.unique(np.ediff1d(lat_data).round(3))

    if (len(dim_lon) != 1) or (len(dim_lat) != 1):
        print("The netCDF file is not a regular longitude/latitude grid")

    data_np = np.array(data)
    data_np = np.transpose(data_np)

    # Figure out if we need to roll the data or not
    # (see https://github.com/giswqs/geemap/issues/285#issuecomment-791385176)
    if np.max(lon_data) > 180:
        data_np = np.roll(data_np, 180, axis=0)
        west_lon = lon_data[0] - 180
    else:
        west_lon = lon_data[0]

    transform = [dim_lon[0], 0, float(west_lon) - dim_lon[0]/2, 0, dim_lat[0], float(lat_data[0]) - dim_lat[0]/2]

    image = geemap.numpy_to_ee(
        data_np, "EPSG:4326", transform=transform, band_names='z'
    )
    return image, data_np, transform

In [None]:
# initialize GEE at the beginning of session
try:
    ee.Initialize()
except Exception as e:
    ee.Authenticate()         # authenticate when using GEE for the first time
    ee.Initialize()

In [None]:
import configparser
import ast

# read local config.ini file
config = configparser.ConfigParser()
config.read('config.ini')

# get file config from config.ini
dir_input = config['FILE_SETTINGS']['DIR_INPUT']
dir_output = config['FILE_SETTINGS']['DIR_OUTPUT']
output_gpkg = dir_output + config['FILE_SETTINGS']['GPKG_NAME']
scenarios = config.getboolean('CONFIG', 'PROJECTIONS')
show_map = config.getboolean('CONFIG','SHOW_MAP')

In [None]:
if show_map:
    Map = geemap.Map()

# Configure the downloaded date range

If you are only interested in modeling the past, set `PROJECTIONS=False` in the `config.ini` to download reanalysis data for your defined modeling period only. Otherwise, all available historic data (since 1979) is downloaded to provide the best possible basis for bias adjustment of the climate scenario data.

In [None]:
if scenarios == True:
    date_range = ['1979-01-01', '2022-01-01']
else:
    date_range = ast.literal_eval(config['CONFIG']['DATE_RANGE'])

***

# ERA5L Geopotential

### Load input data

Load ERA5L file for geopotential and add to Map

In [None]:
file = 'ERA5_land_Z_geopotential_HMA.nc'
filename = dir_input + file

In [None]:
image, data_np, transform = netcdf_to_ee(filename)

In [None]:
if show_map:
    # add image as layer
    vis_params =  {'min': int(data_np.min()), 'max': int(data_np.max()), 'palette': cm.palettes.terrain, 'opacity': 0.8}
    Map.addLayer(image, vis_params, "ERA5L geopotential")

Load catchment and add to map

In [None]:
import geopandas as gpd

catchment_new = gpd.read_file(output_gpkg, layer='catchment_new')
catchment = geemap.geopandas_to_ee(catchment_new)

if show_map:
    Map.addLayer(catchment, {'color': 'darkgrey'}, "Catchment")
    Map.centerObject(catchment, zoom=9)
    display(Map)

### Calculate weighted average geopotential and convert to elevation

In [None]:
# execute reducer
dict = image.reduceRegion(ee.Reducer.mean(),
                          geometry=catchment,
                          crs='EPSG:4326',
                          crsTransform=transform)

# get mean value and print
mean_val = dict.getInfo()['z']
ele_dat = mean_val / 9.80665
print(f'geopotential mean: {mean_val:.2f}, elevation: {ele_dat:.2f}m.a.s.l.')

***

### ERA5L Temperature and Precipitation Time Series

In [None]:
import pandas as pd
import datetime

def setProperty(image):
    dict = image.reduceRegion(ee.Reducer.mean(), catchment)
    return image.set(dict)

In [None]:
collection = ee.ImageCollection('ECMWF/ERA5_LAND/DAILY_RAW')\
    .select('temperature_2m','total_precipitation_sum')\
    .filterDate(date_range[0], date_range[1])

withMean = collection.map(setProperty)

In [None]:
%%time

df = pd.DataFrame()
df['ts'] = withMean.aggregate_array('system:time_start').getInfo()
df['temp'] = withMean.aggregate_array('temperature_2m').getInfo()
df['temp_c'] = df['temp'] - 273.15
df['prec'] = withMean.aggregate_array('total_precipitation_sum').getInfo()
df['prec'] = df['prec'] * 1000
df['dt'] = df['ts'].apply(lambda x: datetime.datetime.fromtimestamp(x / 1000))

In [None]:
df.to_csv(dir_output + 'ERA5L.csv',header=True,index=False)

In [None]:
display(df)

Append data reference elevation to settings.yml.

In [None]:
import yaml

def read_yaml(file_path):
    with open(file_path, 'r') as f:
        data = yaml.safe_load(f)
        return data
    
def write_yaml(data, file_path):
    with open(file_path, 'w') as f:
        yaml.safe_dump(data, f)

def update_yaml(file_path, new_items):
    data = read_yaml(file_path)
    data.update(new_items)
    write_yaml(data, file_path)

        
update_yaml(dir_output + 'settings.yml', {'ele_dat': float(ele_dat)})

# Alternative ERA5 Routine (parallele Tasks)

In [None]:
import multiprocessing
import concurrent.futures
import requests
from retry import retry
from tqdm import tqdm


class ERA5Downloader:
    """Class to download ERA5 data for a given period, variable, and spatial subset."""

    def __init__(self, var, starty, endy, shape, processes=10, dir='./'):
        self.var = var
        self.starty = starty
        self.endy = endy
        self.shape = shape
        self.processes = processes
    
    def download(self):
        """Runs a subset routine for ERA5 data on GEE servers to create ee.ImageCollections for all years in
        the requested period. Downloads individual years in parallel processes to increase the download time."""

        def getRequests(starty, endy):
            """Generates a list of years to be downloaded. [Client side]"""

            return [i for i in range(starty, endy+1)]
        
        @retry(tries=10, delay=1, backoff=2)
        def getResult(index, year):
            """Handle the HTTP requests to download one year of CMIP6 data. [Server side]"""

            start = str(year) + '-01-01'
            end = str(year + 1) + '-01-01'
            startDate = ee.Date(start)
            endDate = ee.Date(end)
            n = endDate.difference(startDate, 'day').subtract(1)
            
            def getImageCollection(var):
                """Create and image collection of CMIP6 data for the requested variable, period, and region.
                [Server side]"""

                collection = ee.ImageCollection('ECMWF/ERA5_LAND/DAILY_RAW') \
                    .select(var) \
                    .filterDate(startDate, endDate)
                return collection
            
            def renameBandName(b):
                """Edit variable names for better readability. [Server side]"""

                split = ee.String(b).split('_')
                return ee.String(split.splice(0, 1).join("_"))

            def buildFeature(i):
                """Create an area weighted average of the defined region for every day in the given year.
                [Server side]"""

                t1 = startDate.advance(i, 'day')
                t2 = t1.advance(1, 'day')
                # feature = ee.Feature(point)
                dailyColl = collection.filterDate(t1, t2)
                dailyImg = dailyColl.toBands()
                # renaming and handling names
                bands = dailyImg.bandNames()
                renamed = bands.map(renameBandName)
                # Daily extraction and adding time information
                dict = dailyImg.rename(renamed).reduceRegion(
                    reducer=ee.Reducer.mean(),
                    geometry=self.shape,
                ).combine(
                    ee.Dictionary({'system:time_start': t1.millis(), 'isodate': t1.format('YYYY-MM-dd')})
                )
                return ee.Feature(None, dict)
            
            # Create features for all days in the respective year. [Server side]
            collection = getImageCollection(self.var)
            year_feature = ee.FeatureCollection(ee.List.sequence(0, n).map(buildFeature))

            # Create a download URL for a CSV containing the feature collection. [Server side]
            url = year_feature.getDownloadURL()

            # Handle downloading the actual csv for one year. [Client side]
            r = requests.get(url, stream=True)
            if r.status_code != 200:
                r.raise_for_status()
            filename = 'output/ERA5/era5_' + self.var + '_' + str(year) + '.csv'
            with open(filename, 'w') as f:
                f.write(r.text)
            
            return index
            
        # Create a list of years to be downloaded. [Client side]
        items = getRequests(self.starty, self.endy)

        # Launch download requests in parallel processes and display a status bar. [Client side]
        with tqdm(total=len(items), desc="Downloading ERA5 data for variable '" + self.var + "'") as pbar:
            results = []
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.processes) as executor:
                for i, year in enumerate(items):
                    results.append(executor.submit(getResult, i, year))
                for future in concurrent.futures.as_completed(results):
                    index = future.result()
                    pbar.update(1)

        print("All downloads complete.")

In [None]:
downloader_t = ERA5Downloader('temperature_2m', 1979, 2021, catchment, processes=25)
downloader_t.download()