In [1]:
import xarray as xr
from pathlib import Path
import numcodecs
import gcsfs
import rechunker
import zarr
from nowcasting_dataset.data_sources.nwp_data_source import open_nwp, NWP_VARIABLE_NAMES
import os
import numpy as np

In [9]:
BUCKET = Path('solar-pv-nowcasting-data')
NWP_PATH = BUCKET / 'NWP/UK_Met_Office/'
SOURCE_PATH = 'gs://' + str(BUCKET / 'NWP/UK_Met_Office/UKV_zarr')
TARGET_PATH = NWP_PATH / 'UKV__2018-01_to_2019-12__chunks__variable10__init_time1__step1__x548__y704__.zarr'
TEMP_STORE_FILENAME = NWP_PATH / 'temp.zarr'

In [1]:
def open_nwp(zarr_store: str) -> xr.Dataset:
    full_dir = os.path.join(SOURCE_PATH, zarr_store)
    ds = xr.open_dataset(
        full_dir, engine='zarr', consolidated=True, mode='r', chunks={})
    ds = ds.rename({'time': 'init_time'})

    # The isobaricInhPa coordinates look messed up, especially in
    # the 2018_7-12 and 2019_7-12 Zarr stores.  So let's drop all
    # the variables with multiple vertical levels for now:
    del ds['isobaricInhPa'], ds['gh_p'], ds['r_p'], ds['t_p']
    del ds['wdir_p'], ds['ws_p']

    # There are a lot of doubled-up indicies from 2018-07-18 00:00
    # to 2018-08-27 09:00.  De-duplicate the index. Code adapted
    # from https://stackoverflow.com/a/51077784/732596
    if zarr_store == '2018_7-12':
        _, unique_index = np.unique(ds.init_time, return_index=True)
        ds = ds.isel(init_time=unique_index)

    # 2019-02-01T21 is in the wrong place! It comes after
    # 2019-02-03T15.  Oops!
    if zarr_store == '2019_1-6':
        sorted_init_time = np.sort(ds.init_time)
        ds = ds.reindex(init_time=sorted_init_time)

    return ds

In [2]:
nwp_datasets = []
for zarr_store in ['2018_1-6', '2018_7-12', '2019_1-6', '2019_7-12']:
    print(zarr_store)
    nwp_datasets.append(open_nwp(zarr_store))

In [4]:
nwp_datasets[0].chunks

Frozen(SortedKeysDict({'init_time': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

In [6]:
nwp_concatenated = xr.concat(nwp_datasets, dim='init_time')

In [7]:
# Convert to array so we can chunk along the 'variable' axis
source_dataset = nwp_concatenated[list(NWP_VARIABLE_NAMES)].to_array()
#source_dataset = source_dataset.isel(init_time=slice(0, 2))
source_dataset = source_dataset.to_dataset(name='UKV')
print(source_dataset)

<xarray.Dataset>
Dimensions:    (init_time: 5442, step: 37, variable: 10, x: 548, y: 704)
Coordinates:
  * step       (step) timedelta64[ns] 00:00:00 01:00:00 ... 1 days 12:00:00
  * init_time  (init_time) datetime64[ns] 2018-01-01 ... 2019-12-31T21:00:00
  * x          (x) float64 -2.38e+05 -2.36e+05 -2.34e+05 ... 8.54e+05 8.56e+05
  * y          (y) float64 1.222e+06 1.22e+06 1.218e+06 ... -1.82e+05 -1.84e+05
  * variable   (variable) <U5 't' 'dswrf' 'prate' 'r' ... 'lcc' 'mcc' 'hcc'
Data variables:
    UKV        (variable, init_time, step, y, x) float32 dask.array<chunksize=(1, 1, 10, 704, 548), meta=np.ndarray>


In [10]:
gcs = gcsfs.GCSFileSystem()
target_store = gcs.get_mapper(TARGET_PATH)
temp_store = gcs.get_mapper(TEMP_STORE_FILENAME)

target_chunks = {
    'UKV': {
        "variable": 10,
        "init_time": 1,
        "step": 1,
        "x": 548,
        "y": 704
    }
}

encoding = {
    'UKV': {
        'compressor': numcodecs.Blosc(cname="zstd", clevel=5),
        'dtype': 'float32'
    }
}

print('Rechunking...')
rechunk_plan = rechunker.rechunk(
    source=source_dataset,
    target_chunks=target_chunks,
    max_mem="2GB",
    target_store=target_store,
    target_options=encoding,
    temp_store=temp_store)

rechunk_plan.execute()

print('Consolidating...')
zarr.convenience.consolidate_metadata(target_store)

print('Done!')

Rechunking...
_copy_chunk((slice(0, 1, None), slice(0, 1, None), slice(0, 10, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(8, 9, None), slice(1850, 1851, None), slice(10, 20, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(6, 7, None), slice(2264, 2265, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(6, 7, None), slice(3868, 3869, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(4, 5, None), slice(4282, 4283, None), slice(30, 37, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(7, 8, None), slice(30, 31, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(5, 6, None), slice(444, 445, None), slice(30, 37, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(7, 8, None), slice(1634, 1635, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(5, 6, None),

KeyboardInterrupt: 

_copy_chunk((slice(7, 8, None), slice(520, 521, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(9, 10, None), slice(1710, 1711, None), slice(10, 20, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(7, 8, None), slice(2124, 2125, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(9, 10, None), slice(3314, 3315, None), slice(10, 20, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(7, 8, None), slice(3728, 3729, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(7, 8, None), slice(5332, 5333, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(6, 7, None), slice(304, 305, None), slice(30, 37, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(8, 9, None), slice(1494, 1495, None), slice(20, 30, None), slice(0, 704, None), slice(0, 548, None)))
_copy_chunk((slice(6, 7, None), sl