## Load common functions, libs and vars

In [1]:
print("version - 15/02/19 14:19")

version - 15/02/19 14:19


In [2]:
# %load common.py
import iris
import xarray
import os
from numcodecs import Blosc
import s3fs
import zarr
import intake
from datetime import datetime, timezone, timedelta
import cf_units
import json
import  dask_kubernetes
import distributed
import boto3
from iris.experimental.equalise_cubes import equalise_attributes
import pandas as pd
import sys

sys.path.append(os.path.normpath(os.getcwd()))
from offsetmap import OffSetS3Map

sqs = boto3.client('sqs')

AWS_EARTH_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
# SQS_QUEUE_URL = 'https://sqs.eu-west-2.amazonaws.com/536099501702/aws-earth-test'
SQS_QUEUE_URL = 'https://sqs.eu-west-2.amazonaws.com/536099501702/rolling_zarr_test_queue'

BUCKET = "metoffice-aws-earth-zarr"

def del_msg(msg):
    sqs.delete_message(
        QueueUrl=SQS_QUEUE_URL,
        ReceiptHandle=msg['receipt_handle']
    )
    
def get_messages(max_num=10):
    res = sqs.receive_message(QueueUrl=SQS_QUEUE_URL, MaxNumberOfMessages=max_num, VisibilityTimeout=60*10)
    messages  = []
    for message in res['Messages']:
        msg = json.loads(message['Body'])
        msg['receipt_handle'] = message['ReceiptHandle']
        messages.append(msg)
    return messages

def get_zar_path(meta):
    base = f"{BUCKET}/{meta['model']}-{meta['name']}"
    if meta.get('cell_methods', False):
        base += f"-{meta['cell_methods']}"
    if  meta.get('height', False) and (len(meta['height'].strip().split(' ')) > 1):
        base += '-at_heights'
    if meta.get('pressure', False) and (len(meta['pressure'].strip().split(' ')) > 1):
        base += '-at_pressures'
    return base + '.zarr'
    

def zarr_store(meta):
    return OffSetS3Map(root=get_zar_path(meta), temp_chunk_path=meta['name'], check=False)
    
def msg_to_path(msg):
    return f'/s3/{msg["bucket"]}/{msg["key"]}'

def reshape_to_dest_cube(cube):
    return iris.util.new_axis(iris.util.new_axis(cube, 'forecast_period'), 'forecast_reference_time')


def get_proto_zarr_array(meta):
    OffSetS3Map(root=get_zar_path(meta), temp_chunk_path=meta['name'], check=False)
    array_store = OffSetS3Map(root=get_zar_path(meta) +'/' + meta['name'], temp_chunk_path='', check=False)
    return zarr.open(array_store)


# def get_messages_like(max_try=500,**filters):
#     """filters: Value of True means has key and is not False. value of None means doesn't have key or value is None. Anyother value is a straight mathch"""
    
#     def match(msg):
#         for key, value in filters.items():
#             if callable(value):
#                 if not value(msg.get(key,None)):
#                     return False
#             elif not msg.get(key,None) == value:
#                 return False
#         return True

#     attempt = 0
#     msg = None
#     while attempt < max_try:
#         attempt +=1
#         for msg in [m['Message'] for m in get_messages(10)[1]]:
#             msg = json.loads(msg)
#             if match(msg):
#                 yield msg


# sample_sns_message = {
#     'model': 'mo-atmospheric-mogreps-g-prd',
#     'ttl': 1544268873,
#     'time': '2018-12-13T09:00:00Z',
#     'created_time': '2018-12-06T11:22:27Z',
#     'name': 'air_temperature',
#     'object_size': 711463962,
#     'forecast_period': 615600,
#     'forecast_reference_time': '2018-12-06T06:00:00Z',
#     'pressure': '100000.0 97500.0 95000.0 92500.0 90000.0 85000.0 80000.0 75000.0 70000.0 65000.0 60000.0 55000.0 50000.0 45000.0 40000.0 37500.0 35000.0 32500.0 30000.0 27500.0 25000.0 22500.0 20000.0 17500.0 15000.0 12500.0 10000.0 7000.0 5000.0 4000.0 3000.0 2000.0 1000.0',
#     'forecast_period_units': 'seconds',
#     'pressure_units': 'Pa',
#     'bucket': 'aws-earth-mo-examples',
#     'key': 'cafef7005477edb001aa7dc50eab78c5ef89d420.nc',
#     'realization': '0 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34'
# }


    /opt/conda/lib/python3.6/site-packages/intake_iris/netcdf.py
and
    /opt/conda/lib/python3.6/site-packages/intake_xarray/netcdf.py
Keeping plugin from first location.
  % (plugin_name, orig_path, new_path))


## Process new incoming file

In [3]:
msgs = get_messages(1)
assert len(msgs) >= 1, "No messages recived"
    
msg = msgs[0]
msg

{'created_time': '2019-02-14T06:59:35Z',
 'forecast_period': '10800',
 'forecast_period_units': 'seconds',
 'height_units': 'm',
 'ttl': '1550369304',
 'realization': '0 1 2 3 4 5 6 7 8 9 10 11',
 'bucket': 'informatics-aws-earth-staging',
 'forecast_reference_time': '2019-02-14T03:00:00Z',
 'name': 'air_temperature',
 'model': 'mo-atmospheric-mogreps-uk-prd',
 'time': '2019-02-14T06:00:00Z',
 'key': '71ef822facc9efd00829c570fa6cfc822791797d.nc',
 'object_size': '323943448',
 'height': '5.0 10.0 20.0 30.0 50.0 75.0 100.0 150.0 200.0 250.0 300.0 400.0 500.0 600.0 700.0 800.0 1000.0 1250.0 1500.0 1750.0 2000.0 2250.0 2500.0 2750.0 3000.0 3250.0 3500.0 3750.0 4000.0 4500.0 5000.0 5500.0 6000.0',
 'receipt_handle': 'AQEBvODEplTVODCi1vvYBb/ivzmChFEy+13eG43Olfs2JIg1h2UcSYuZjnIfHhLlDleyqfgPJbjSZjpVm6iQxs22RVpMW+Gc5q7UKL39I6RqXsQ5dmtp+mcc7dI17BFzGcLkPTytb839ED6+3h2RjbRZNQgPG7AxEtGY+TPbgREG5K4WZUoz7cmE1ERz8a77gYow0E+5ZPHeD0DAXYwrgxEMgixRsO8WfHlNkbjtWj8MVfH1f1Ft9crqUHiwxDqTK9iXUlbPKhz3Ef/pxtcNYn

In [4]:
do_download = True

In [5]:
import tempfile
import uuid

def download(msg):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(msg['bucket'])
    key = msg['key']
    dest = os.path.join(f'./{uuid.uuid4().hex}.nc')
    with open(dest,'wb') as fp:
        fp.write(bucket.Object(key).get()['Body'].read())
    print(dest)
    return dest

In [6]:
if do_download:
    file_path = download(msg)
else:
    file_path = msg_to_path(msg)
print (f"Work with file at {file_path}")

./33ce2b55dbb441c4a48818f1e079343a.nc
Work with file at ./33ce2b55dbb441c4a48818f1e079343a.nc


In [7]:
cube_to_add = reshape_to_dest_cube(iris.load_cube(file_path))
print(f"Run is {list(cube_to_add.coord('forecast_reference_time').cells())[0].point}")
print(f"forecast_period is {list(cube_to_add.coord('forecast_period').cells())[0].point}")
cube_to_add

Run is 2019-02-14 03:00:00
forecast_period is 10800


Air Temperature (K),forecast_reference_time,forecast_period,realization,height,projection_y_coordinate,projection_x_coordinate
Shape,1,1,12,33,970,1042
Dimension coordinates,,,,,,
forecast_reference_time,x,-,-,-,-,-
forecast_period,-,x,-,-,-,-
realization,-,-,x,-,-,-
height,-,-,-,x,-,-
projection_y_coordinate,-,-,-,-,x,-
projection_x_coordinate,-,-,-,-,-,x
Scalar coordinates,,,,,,
time,2019-02-14 06:00:00,2019-02-14 06:00:00,2019-02-14 06:00:00,2019-02-14 06:00:00,2019-02-14 06:00:00,2019-02-14 06:00:00


## work out offset
Look at the `_origin` of the zarr and calculate how far from the origin the current file is. This is our offset that we need when defining the chunks.

In [8]:
dest_zarr = get_proto_zarr_array(msg)
origin = dest_zarr.attrs['_origin']
offsets = []
try:
    for i,dim in enumerate(origin):
        coord = cube_to_add.coord(dim['name'])
        step = dim.get('step', None)
        if step:
            assert dim['unit'] == str(coord.units), "units are not same %s != %s" % (dim['unit'], coord.units) # units must be same
            diff = coord.points[0] - dim['at']
            assert diff >= 0, "index would be negative: origin: %s, point: %s" % ( dim['at'], coord.points[0]) # diff must be in the +ve direction zarr doesn't -ve index
            offset  = diff / step / dest_zarr.chunks[i]
            assert offset % 1 == 0 # offset must be an int. index can't be fractional.
            print("for", coord.name(), "offset is ", offset)
            offsets += [int(offset)]
        else:
            offsets += [int(0)]
except AssertionError as e:
    del_msg(msg)
    raise
        
    
offsets   

for forecast_reference_time offset is  120.0
for forecast_period offset is  3.0


[120, 3, 0, 0, 0, 0]

In [9]:
data = cube_to_add.lazy_data()
data = data.rechunk(dest_zarr.chunks)
data

dask.array<rechunk-merge, shape=(1, 1, 12, 33, 970, 1042), dtype=float32, chunksize=(1, 1, 3, 11, 970, 1042)>

## Write new chunks
'loop' over all blocks in the file and save them to their new index in the rolling zarr array

In [10]:
# # Create a cluster. Need to ensure it has write access to the write dest of the Zarr, hence using the template which has AWS keys in as Env vars
# import dask_kubernetes
# import distributed
# dask_worker_template = os.path.expanduser("~/ota/small-worker-template.yaml")
# cluster = dask_kubernetes.KubeCluster.from_yaml(dask_worker_template)
# cluster.adapt(minimum=0,maximum=40)
# c = distributed.Client(cluster)
# cluster

In [11]:
# start_write = datetime.now()
# def process_block(block, block_info=None):
#     print(block_info)
#     cloc = block_info[0]['chunk-location']
#     to_cloc = [o+i for o, i in zip(offsets, cloc)]
#     key = '.'.join(f"{int(i):d}" for i in to_cloc )
#     dest_zarr.store[key] = dest_zarr._encode_chunk(block)
#     print("Done block")
#     return block
        
# data.map_blocks(process_block, dtype=data.dtype).compute()
# end_write = datetime.now()
# f"Blocks write took {end_write - start_write}"

In [12]:
"""This cell has got more complicated than desired (original above) because it was failing after 
processing all chunks when running on papermill as a Kubernettes cron job. 
The hypothysis is that it was running out of memorry so this tries to shrink the 
output right down.

Turns out the error was acctually because of different container versions was 0.5.14 but should be informaticslab/pangeo-notebook:0.5.13.

However, kept this in as it "feels" like to make things quicker.
"""

import numpy as np


start_write = datetime.now()
def process_block(block, block_info=None, out_shape=None):
    print(block_info)
    cloc = block_info[0]['chunk-location']
    to_cloc = [o+i for o, i in zip(offsets, cloc)]
    key = '.'.join(f"{int(i):d}" for i in to_cloc )
    dest_zarr.store[key] = dest_zarr._encode_chunk(block)
    print("Done block")
    return np.zeros(out_shape)
        
    
drop_axis = [index for index, sizes in enumerate(data.chunks) if len(sizes) == 1]
out_shape = [1 for i in range(len(data.shape) - len(drop_axis))]
new_chunks = [1 for i in range(len(out_shape))]

print(f"drop axis: {drop_axis}. New output shape: {out_shape}, in chunks {new_chunks}")
dummy = data.map_blocks(process_block, 
                dtype=data.dtype,
                chunks=new_chunks,
                out_shape = out_shape,
                drop_axis=drop_axis).compute()
end_write = datetime.now()
f"Blocks write took {end_write - start_write}"

drop axis: [0, 1, 4, 5]. New output shape: [1, 1], in chunks [1, 1]
{0: {'shape': (1, 1, 12, 33, 970, 1042), 'num-chunks': (1, 1, 4, 3, 1, 1), 'array-location': [(0, 1), (0, 1), (0, 3), (22, 33), (0, 970), (0, 1042)], 'chunk-location': (0, 0, 0, 2, 0, 0)}}
Done block
{0: {'shape': (1, 1, 12, 33, 970, 1042), 'num-chunks': (1, 1, 4, 3, 1, 1), 'array-location': [(0, 1), (0, 1), (9, 12), (0, 11), (0, 970), (0, 1042)], 'chunk-location': (0, 0, 3, 0, 0, 0)}}
Done block
{0: {'shape': (1, 1, 12, 33, 970, 1042), 'num-chunks': (1, 1, 4, 3, 1, 1), 'array-location': [(0, 1), (0, 1), (6, 9), (0, 11), (0, 970), (0, 1042)], 'chunk-location': (0, 0, 2, 0, 0, 0)}}
Done block
{0: {'shape': (1, 1, 12, 33, 970, 1042), 'num-chunks': (1, 1, 4, 3, 1, 1), 'array-location': [(0, 1), (0, 1), (6, 9), (22, 33), (0, 970), (0, 1042)], 'chunk-location': (0, 0, 2, 2, 0, 0)}}
Done block
{0: {'shape': (1, 1, 12, 33, 970, 1042), 'num-chunks': (1, 1, 4, 3, 1, 1), 'array-location': [(0, 1), (0, 1), (3, 6), (0, 11), (0, 97

'Blocks write took 0:02:17.187724'

In [13]:
# del c
# del cluster
# dask.config.set(scheduler=None)

In [15]:
print("Blocks written")
if do_download:
    try:
        print("Try delete file")
        os.remove(file_path)
    except IOError:
        pass

Blocks written
Try delete file


## Chunk adding done - delete the message
We've now added all the chunks from that file. We could repeate the above for all files. However untill the metadata is updated these extra chunks won't be represented.

In [16]:
print("Processing done. Delete message")
del_msg(msg)

Processing done. Delete message


In [17]:
"End"

'End'