![logo](https://jupyter.informaticslab.co.uk/hub/logo)

Converting 100TB data in the Cloud
==================

We've made 100TB of Met Office weather forecast data available in the AWS cloud. We benchmarked our distributed computation system, Jade, by converting all 1.8M files from Met Office PP format to compressed NetCDF format.

### Install dependencies

In [None]:
!apt-get install -y graphviz
!pip install graphviz
!conda update -y distributed

### Some code to convert files and check the conversion

In [35]:
import os
import tempfile
import numpy as np

import boto3 as boto
import iris

import json
import itertools

import dask

from env import AWS_KEY_ID, AWS_SECRET_KEY_ID, USER_ID


SQS_ENDPOINT_URL = "https://sqs.eu-west-2.amazonaws.com/"
JOB_QUEUE_NAME = "mogreps-conversion"
FAILED_QUEUE_NAME = "mogreps-conversion-failed"
PASSED_QUEUE_NAME = "mogreps-conversion-completed"
JOB_QUEUE_URL = SQS_ENDPOINT_URL + USER_ID + "/" + JOB_QUEUE_NAME


iris.FUTURE.netcdf_no_unlimited = True
iris.FUTURE.netcdf_promote = True


def init_aws():
    session = boto.session.Session(region_name="eu-west-2",
                                   aws_access_key_id=AWS_KEY_ID,
                                   aws_secret_access_key=AWS_SECRET_KEY_ID)
    global sqs
    sqs = session.resource('sqs')
    global s3
    s3 = session.resource('s3')
    

def tear_down_aws():
    sqs.meta.client._endpoint.http_session.close()
    s3.meta.client._endpoint.http_session.close()


def parse_s3_uri(s3_uri):
    nasty_stuff = s3_uri.split("/")
    _, extension = os.path.splitext(s3_uri)
    bucket = nasty_stuff[2]
    key = "/".join(nasty_stuff[3:])

    return bucket, key, extension


def download_object(s3_uri):
    bucket, key, extension = parse_s3_uri(s3_uri)
    data_file = tempfile.NamedTemporaryFile(mode='w+b', suffix=extension)
    s3.Object(bucket, key).download_file(data_file.name)

    return data_file


def assert_cube_lists_equivalent(cubes_in, cubes_out):
     for cube_in in cubes_in:
        if cube_in.name() is not 'unknown':
            name_con = iris.Constraint(name=cube_in.name())
            coords_con = iris.Constraint(cube_func=lambda c: len(c.coords()) == len(cube_in.coords()))
            shape_con = iris.Constraint(cube_func=lambda c: c.shape == cube_in.shape)
            if len(cube_in.cell_methods) == 0:
                cell_method_con = iris.Constraint(cube_func=lambda c: len(c.cell_methods) == 0)
            else:
                def cmc_con_fn(c):
                    if len(c.cell_methods) > 0:
                        return c.cell_methods[0].method == cube_in.cell_methods[0].method
                    else:
                        return False
                cell_method_con = iris.Constraint(cube_func=cmc_con_fn)
                            
            [cube_out] = cubes_out.extract(name_con&coords_con&cell_method_con&shape_con)

            np.testing.assert_equal(cube_in.data, cube_out.data)
            
            
def pp_key_to_nc_key(pp_key):
    return pp_key.split("/")[-1].replace('pp', 'nc')
    
    
def replace_s3_obj(s3_uri, new_file_name):
    old_bucket, old_s3_key, old_extension = parse_s3_uri(s3_uri)
    
    if 'mogreps-g' in s3_uri:
        new_bucket = 'mogreps-g'
    elif 'mogreps-uk' in s3_uri:
        new_bucket = 'mogreps-uk'
    else:
        raise ValueError
        
    new_s3_key = pp_key_to_nc_key(old_s3_key)
    
    s3.Object(new_bucket, new_s3_key).upload_file(new_file_name)
    s3.Object(old_bucket, old_s3_key).delete()
    
    
def add_to_queue(s3_uri, queue):
    
    failed_queue = sqs.get_queue_by_name(QueueName=queue)
    failed_queue.send_message(MessageBody=s3_uri)
    
    
def save_to_netcdf(s3_uri, complevel=1):
    data_file_in = download_object(s3_uri)
    try:
        cubes_in = iris.load(data_file_in.name)
    except:
        raise AttributeError
    
    data_file_out = tempfile.NamedTemporaryFile(mode='w+b', suffix=".nc")
    iris.save(cubes_in, data_file_out.name, netcdf_format="NETCDF4", zlib=True, complevel=complevel)
    return data_file_in, data_file_out
    

def get_a_job():
    queue = sqs.Queue(JOB_QUEUE_URL)
    some_messages = queue.receive_messages(MaxNumberOfMessages=1)
    [a_message] = some_messages
    s3_uri = "s3://mogreps/"+json.loads(a_message.body)["Message"]
    return (a_message.receipt_handle, s3_uri)


def convert_next_object_from_s3(message_handle, s3_uri):
    """ Converts file to NetCDF """

    msg = sqs.Message(JOB_QUEUE_URL, message_handle)

    try:
        data_file_in, data_file_out = save_to_netcdf(s3_uri)
    except Exception as e:
        msg.delete()
        add_to_queue(s3_uri, FAILED_QUEUE_NAME)
        raise
        
    try:
        cubes_in = iris.load(data_file_in.name)
        cubes_out = iris.load(data_file_out.name)
        assert_cube_lists_equivalent(cubes_in, cubes_out)
    except Exception as e:
        msg.delete()
        add_to_queue(s3_uri, FAILED_QUEUE_NAME)
        raise
    else:
        replace_s3_obj(s3_uri, data_file_out.name)
        msg.delete()
        add_to_queue(s3_uri, PASSED_QUEUE_NAME)
    finally:
        tear_down_aws()
        

def old_file_exists(s3_uri):
    mogreps_bucket = mys3.Bucket('mogreps')
    
    pp_bucket, pp_s3_key, pp_extension = parse_s3_uri(s3_uri)
    objs = list(mogrepsg_bucket.objects.filter(Prefix=pp_s3_key))
    
    return len(objs) > 0
    
    
def new_file_exists(s3_uri):
    mogrepsg_bucket = mys3.Bucket('mogreps-g')
    mogrepsuk_bucket = mys3.Bucket('mogreps-uk')
    
    pp_bucket, pp_s3_key, pp_extension = parse_s3_uri(s3_uri)
    nc_s3_key = pp_key_to_nc_key(pp_s3_key)
    if 'mogreps-g' in s3_uri:
        objs = list(mogrepsg_bucket.objects.filter(Prefix=nc_s3_key))
    else:
        objs = list(mogrepsuk_bucket.objects.filter(Prefix=nc_s3_key))
    
    return len(objs) > 0
    
    
def delete_old_file(s3_uri):
    pp_bucket, pp_s3_key, pp_extension = parse_s3_uri(s3_uri)
    s3.Object(pp_bucket, pp_s3_key).delete()
    
        
def deal_with_job(idno):
    init_aws()
    message_handle, s3_uri = get_a_job()
    
    old_file_exists = old_file_exists(s3_uri)
    new_file_exists = new_file_exists(s3_uri)
    
    if old_file_exists and new_file_exists:
        print("a")
        delete_old_file()
        convert_next_object_from_s3(message_handle, s3_uri)
    elif old_file_exists and not new_file_exists:
        print("b")
        convert_next_object_from_s3(message_handle, s3_uri)
    elif not old_file_exists and new_file_exists:
        print("c")
        msg = sqs.Message(JOB_QUEUE_URL, message_handle)
        msg.delete()
    else:
        raise RunTimeError

### Hello compute cluster!

In [2]:
import dask
import distributed
e = distributed.Executor("ec2-52-56-232-146.eu-west-2.compute.amazonaws.com:8786")
e

<Client: scheduler='tcp://ec2-52-56-232-146.eu-west-2.compute.amazonaws.com:8786' processes=2990 cores=2990>

In [None]:
e.submit(convert_next_object_from_s3, pure=False)

In [None]:
for _ in range(10):
    f = e.submit(convert_next_object_from_s3, pure=False)

In [3]:
for i in range(500):
    _ = e.map(convert_next_object_from_s3, range(1000), pure=False)

## Check failed jobs

In [10]:
mysession = boto.session.Session(region_name="eu-west-2",
                                   aws_access_key_id=AWS_KEY_ID,
                                   aws_secret_access_key=AWS_SECRET_KEY_ID)
mysqs = mysession.resource('sqs')

In [13]:
failed_queue = mysqs.Queue('https://sqs.eu-west-2.amazonaws.com/021908831235/mogreps-conversion-failed')
failed_queue = mysqs.Queue('https://sqs.eu-west-2.amazonaws.com/021908831235/mogreps-conversion-failed')

In [16]:
mys3 = mysession.resource('s3')

In [33]:
mogrepsg_bucket = mys3.Bucket('mogreps-g')
mogrepsuk_bucket = mys3.Bucket('mogreps-uk')

In [31]:
[msg] = failed_queue.receive_messages(MaxNumberOfMessages=1)
objpath = msg.body
print(objpath)
if 'mogreps-g' in objpath:
    print('g')
    objs = list(mogrepsg_bucket.objects.filter(Prefix=msg.body[13:]))
else:
    print('uk')
    objs = list(mogrepsuk_bucket.objects.filter(Prefix=msg.body[13:]))

if len(objs) > 0:
    print("deleting")
    # jobs worked
    msg.delete()

s3://mogreps/2016/prods_op_mogreps-g_20160103_06_19_162.pp
g


In [32]:
objs

[]

In [14]:
while True
    [msg] = failed_queue.receive_messages(MaxNumberOfMessages=1)
    objpath = msg.body
    if 'mogreps-g' in objpath:
        objs = list(mogrepsg_bucket.objects.filter(Prefix=msg.body[13:]))
    else:
        objs = list(mogrepsuk_bucket.objects.filter(Prefix=msg.body[13:]))
    
    if len(objs) > 0:
        # jobs worked
        msg.delete()

put all jobs back on to do queue

In [None]:
while True:
    [msg] = failed_queue.receive_messages(MaxNumberOfMessages=1)
    