# EXAMPLE NOTEBOOK: 10 day rolling mean
### Purpose of this notebook is to demonstrate doing a 10-day rolling mean with the data that has been saved to Zarr

In [1]:
import iris
import copy
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt

import crd_utils as crd

In [2]:
# zarr = '/data/cssp-china/zarr_daily_1851-1859/'
# zarr.storage.ABSStore(container, prefix='', account_name=None, account_key=None, blob_service_kwargs=None)
import zarr
import getpass
account_key = getpass.getpass()


zarr_read = zarr.storage.ABSStore('cssp-china', prefix='zarr_daily_1851-1859', account_name='awsearth', account_key=account_key, blob_service_kwargs=None)

 ························································································


In [3]:
%%time
ds = xr.open_zarr(zarr_read)
ds

CPU times: user 1.39 s, sys: 58.2 ms, total: 1.45 s
Wall time: 3.79 s


In [4]:
# What is the size in GBs?
ds.nbytes / 1e9

72.222473716

### Using dask_distributed, let's compute the rolling mean

In [5]:
import os
import distributed
import dask
from dask_kubernetes import KubeCluster
from dask import array as da

In [6]:
cluster = KubeCluster()
# cluster.adapt(minimum=1)
cluster.scale(n=10)
cluster

distributed.dashboard.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.244.0.18:40653
distributed.scheduler - INFO -   dashboard at:                     :8787


VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [7]:
client = distributed.Client(cluster)
client

distributed.scheduler - INFO - Receive client connection: Client-240ea6a6-597f-11ea-a9ab-7ab1af4de61a
distributed.core - INFO - Starting established connection


0,1
Client  Scheduler: tcp://10.244.0.18:40653  Dashboard: /user/kaedonkers/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## Create a rolling window for a Dataset using Xarray

In [8]:
vars_3d = copy.deepcopy([name for name in ds.data_vars if not 'pressure' in name])
vars_3d

['air_temperature_at_2m_max',
 'air_temperature_at_2m_mean',
 'air_temperature_at_2m_min',
 'cloud_area_fraction',
 'geopotential_height',
 'm01s05i217_max',
 'precipitation_flux',
 'relative_humidity_at_2m_max',
 'relative_humidity_at_2m_mean',
 'relative_humidity_at_2m_min',
 'specific_humidity',
 'surface_downwelling_longwave_flux_in_air',
 'surface_downwelling_shortwave_flux_in_air',
 'surface_temperature_max',
 'surface_temperature_mean',
 'surface_temperature_min',
 'x_wind_at_10m_mean',
 'y_wind_at_10m_mean']

In [9]:
vars_3d.pop(4)
vars_3d

['air_temperature_at_2m_max',
 'air_temperature_at_2m_mean',
 'air_temperature_at_2m_min',
 'cloud_area_fraction',
 'm01s05i217_max',
 'precipitation_flux',
 'relative_humidity_at_2m_max',
 'relative_humidity_at_2m_mean',
 'relative_humidity_at_2m_min',
 'specific_humidity',
 'surface_downwelling_longwave_flux_in_air',
 'surface_downwelling_shortwave_flux_in_air',
 'surface_temperature_max',
 'surface_temperature_mean',
 'surface_temperature_min',
 'x_wind_at_10m_mean',
 'y_wind_at_10m_mean']

In [99]:
ds_3d = ds[vars_3d]
ds_3d

In [100]:
# What is the size in GBs?
ds_3d.nbytes / 1e9

13.80919302

In [101]:
# ds_roll = ds.rolling(time=10, center=False)
ds_roll = ds_3d.rolling(time=10, center=False)
ds_roll

DatasetRolling [window->10,center->False,dim->time]

In [111]:
vars_4d = copy.deepcopy([name for name in ds.data_vars if 'pressure' in name])
vars_4d

['air_pressure_at_sea_level',
 'air_temperature_at_pressure_mean',
 'relative_humidity_at_pressure_mean',
 'surface_air_pressure',
 'x_wind_at_pressure_mean',
 'y_wind_at_pressure_mean']

In [115]:
ds_4d = ds[vars_4d[0:3]]
ds_4d

In [127]:
# What is the size in GBs?
ds_4d.nbytes / 1e9

23.56947874

In [116]:
ds_roll = ds_4d.rolling(time=10, center=False)
ds_roll

DatasetRolling [window->10,center->False,dim->time]

In [128]:
ds_roll = ds.rolling(time=10, center=False)
ds_roll

DatasetRolling [window->10,center->False,dim->time]

In [129]:
%%time
ds_rmean = ds_roll.mean()

CPU times: user 3.88 s, sys: 126 ms, total: 4.01 s
Wall time: 7.78 s


In [118]:
# ds_mean_c = ds_rmean.compute()
# ds_mean_c

## Write to a zarr

In [130]:
prefix = 'zarr_rolling_daily_mean_dask'
zarr_path = f'/data/cssp-china/{prefix}'
zarr_write = zarr.storage.ABSStore('cssp-china', prefix=prefix, account_name='awsearth', account_key=account_key, blob_service_kwargs=None)

In [131]:
print(zarr_path)
os.path.isdir(zarr_path)

/data/cssp-china/zarr_rolling_daily_mean_dask


True

In [132]:
!rm -rdf {zarr_path}

In [133]:
os.path.isdir(f'/data/cssp-china/{prefix}')

False

In [134]:
chunks = dict(ds_rmean.dims, time=200)
chunks

{'grid_latitude': 219,
 'grid_latitude_1': 218,
 'grid_longitude': 286,
 'grid_longitude_1': 286,
 'pressure': 14,
 'time': 200}

In [135]:
%%time
ds_rmean = ds_rmean.chunk(chunks=chunks)
ds_rmean.to_zarr(zarr_write, consolidated=True, mode='w')

Client-Request-ID=6d9376e0-5985-11ea-83a7-7ab1af4de61a Retry policy did not allow for a retry: Server-Timestamp=Thu, 27 Feb 2020 17:20:18 GMT, Server-Request-ID=adbd1b3b-c01e-008a-5292-edf35e000000, HTTP status code=404, Exception=The specified blob does not exist. ErrorCode: BlobNotFound<?xml version="1.0" encoding="utf-8"?><Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.RequestId:adbd1b3b-c01e-008a-5292-edf35e000000Time:2020-02-27T17:20:19.2165893Z</Message></Error>.
Client-Request-ID=6e4fe78a-5985-11ea-83a7-7ab1af4de61a Retry policy did not allow for a retry: Server-Timestamp=Thu, 27 Feb 2020 17:20:20 GMT, Server-Request-ID=adbd2251-c01e-008a-1892-edf35e000000, HTTP status code=404, Exception=The specified blob does not exist. ErrorCode: BlobNotFound<?xml version="1.0" encoding="utf-8"?><Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.RequestId:adbd2251-c01e-008a-1892-edf35e000000Time:2020-02-27T17:20:20.4524688Z</Message></Error>

KilledWorker: ("('zarr-4489c914c5a6d02d616996ad70f1d63d', 15, 10, 0, 0)", <Worker 'tcp://10.244.0.87:35585', name: 5, memory: 0, processing: 457>)

In [136]:
ds_z = xr.open_zarr(zarr_write)
ds_z

In [137]:
ds_z.nbytes / 1e9

72.222473716

distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.244.9.47:46003', name: 9, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://10.244.9.47:46003
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.244.9.48:36917', name: 6, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://10.244.9.48:36917


In [138]:
len(ds_z.data_vars)

24

In [139]:
len(ds.data_vars)

24