<a href="https://colab.research.google.com/github/pgarg7/ERA5_CDS_Dask/blob/main/ERA5_CDS_Dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install cdsapi
!pip install fsspec
!pip install aiohttp
!pip install requests

Collecting cdsapi
  Downloading cdsapi-0.5.1.tar.gz (12 kB)
Building wheels for collected packages: cdsapi
  Building wheel for cdsapi (setup.py) ... [?25l[?25hdone
  Created wheel for cdsapi: filename=cdsapi-0.5.1-py2.py3-none-any.whl size=11699 sha256=91539d7415e9f114a0ee1924f23db99d02974dde47efb7bc58351319dea5515c
  Stored in directory: /root/.cache/pip/wheels/1d/2e/3c/3746e0cd076320584d6f47b688da7b13a5d5d1a92606779ca4
Successfully built cdsapi
Installing collected packages: cdsapi
Successfully installed cdsapi-0.5.1


# Change the key you got from your ERA5 account 

In [2]:

%%writefile /root/.cdsapirc
url: https://cds.climate.copernicus.eu/api/v2
key: #ENTER YOUR KEY HERE#

Writing /root/.cdsapirc


In [4]:
%pylab inline
import cdsapi
import fsspec
import xarray as xr
import dask.array as dsa
from dask.base import tokenize
import pandas as pd
import re


Populating the interactive namespace from numpy and matplotlib


In [5]:
def request_to_dataset(name, request):
    c = cdsapi.Client()
    r = c.retrieve(name, request, None)
    with fsspec.open(r.location) as f:
        ds = xr.open_dataset(f, engine='scipy')
    return ds


def request_to_numpy_array(name, request, vname):
    ds = request_to_dataset(name, request)
    return ds[vname].values


def full_request(request_base, date):
    request = {'year': f'{date.year}',
               'month': f'{date.month:02d}',
               'day': f'{date.day:02d}',
               'time': f'{date.hour:02d}:00'}
    request.update(request_base)
    return request

    
class CDSTimeseriesRequest:
    """A class that can generate lazy representations of CDS data for many timesteps.
    
    Parameters
    ----------
    name : str
        Passed to CDS API
    request_base : dict
        Passed to CDS API
    **date_range_kwargs : dict
        Passed to pandas.date_range to generate dates
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.date_range.html
    """
    
    def __init__(self, name, request_base, **date_range_kwargs):
        self.name = name
        self.request_base = request_base
        # TODO: validate date range
        self.time = pd.date_range(**date_range_kwargs)
        
    
    def dataset(self):
        """Create xarray dataset for requests.
        
        Returns
        -------
        ds : xarray.Dataset
        """
        # get the first file eagerly
        request = full_request(self.request_base, self.time[0])
        ds1 = request_to_dataset(self.name, request)
        
        assert len(ds1.data_vars) == 1
        vname = list(ds1.data_vars.keys())[0]
        dvar = ds1[vname]
        dtype = dvar.dtype
        base_shape = dvar.shape[1:]
        shape = (len(self.time),) + base_shape

        # manually build dask graph
        # this is a bit a black magic, and there are easier ways to do it
        # however, this way scales very well (can handle millions of tasks)
        chunks = (len(self.time) * (1,),) + tuple([(s,) for s in base_shape])
        dsk = {}
        token = tokenize(self.name, self.request_base, self.time)
        for n_time, time in enumerate(self.time):
            request = full_request(self.request_base, time)
            name = '-'.join([vname, token])
            key = (name, n_time) + len(base_shape) * (0,)
            task = request_to_numpy_array, self.name, request, vname
            dsk[key] = task
        data = dsa.Array(dsk, name, chunks, dtype)

        # put back together as a dataset
        coords = ds1.coords
        del coords['time']
        ds = xr.Dataset({vname: (dvar.dims, data, dvar.attrs)},
                        coords=coords)
        ds['time'] = 'time', self.time
        return ds

# **Method 1**
## This is a bit faster but just for one variable at a time. 

In [8]:
name = 'reanalysis-era5-pressure-levels'
request_base = {'variable': 'temperature',
                'pressure_level': '925',
                'product_type': 'reanalysis',
                'format': 'netcdf'}
r = CDSTimeseriesRequest(name, request_base, start='2011-06-01 00:00', freq='6H', periods=185) #Just replace periods and you are good to go!!!
dsc = r.dataset()
dsc

2021-07-29 16:49:37,568 INFO Welcome to the CDS
2021-07-29 16:49:37,570 INFO Sending request to https://cds.climate.copernicus.eu/api/v2/resources/reanalysis-era5-pressure-levels
2021-07-29 16:49:37,677 INFO Request is completed


Unnamed: 0,Array,Chunk
Bytes,768.30 MB,4.15 MB
Shape,"(185, 721, 1440)","(1, 721, 1440)"
Count,185 Tasks,185 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 768.30 MB 4.15 MB Shape (185, 721, 1440) (1, 721, 1440) Count 185 Tasks 185 Chunks Type float32 numpy.ndarray",1440  721  185,

Unnamed: 0,Array,Chunk
Bytes,768.30 MB,4.15 MB
Shape,"(185, 721, 1440)","(1, 721, 1440)"
Count,185 Tasks,185 Chunks
Type,float32,numpy.ndarray


# **Method 2**
## This works for multiple variables, levels etc. But just a tad slower than the previous one.

In [9]:

c = cdsapi.Client()
name = 'reanalysis-era5-pressure-levels'
#You can copy the API request from ERA5 download page here directly and it should work.

request = {
        'year'          : '2018',
        'month'         : ['11','12'],
        'day'           : ['01', '02', '03',
                          '04', '05', '06',
                          '07', '08', '09',
                          '10', '11', '12',
                          '13', '14', '15',
                          '16', '17', '18',
                          '19', '20', '21',
                          '22', '23', '24',
                          '25', '26', '27',
                          '28', '29', '30',
                          '31',],
        'time'          : ['00:00', '06:00', '12:00',
                           '18:00',],
        'variable'      : ['relative_humidity', 'temperature',],
        'pressure_level': '925',
        'product_type'  : 'reanalysis',
        'format'        : 'netcdf'
    }
r = c.retrieve(name, request, None)

with fsspec.open(r.location) as f:
    ds = xr.open_dataset(f, engine='scipy')
    print(ds)

2021-07-29 16:50:40,115 INFO Welcome to the CDS
2021-07-29 16:50:40,117 INFO Sending request to https://cds.climate.copernicus.eu/api/v2/resources/reanalysis-era5-pressure-levels
2021-07-29 16:50:40,210 INFO Request is queued
2021-07-29 16:50:41,302 INFO Request is running
2021-07-29 16:52:34,472 INFO Request is completed


<xarray.Dataset>
Dimensions:    (latitude: 721, longitude: 1440, time: 244)
Coordinates:
  * longitude  (longitude) float32 0.0 0.25 0.5 0.75 ... 359.0 359.2 359.5 359.8
  * latitude   (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * time       (time) datetime64[ns] 2018-11-01 ... 2018-12-31T18:00:00
Data variables:
    r          (time, latitude, longitude) float32 ...
    t          (time, latitude, longitude) float32 ...
Attributes:
    Conventions:  CF-1.6
    history:      2021-07-29 16:51:43 GMT by grib_to_netcdf-2.20.0: /opt/ecmw...


In [11]:
ds.r