In [None]:
!ls /s3/mogreps-uk/prods_op_mogreps-uk_20130301_09_00

# What files/data:

We will try make a cube that represents all of the mogreps-uk data for 2013. The data is stored on a public S3 bucket `mogreps-uk`. We have mounted that bucket using a FUSE file systems that 'mounts' S3. The data is accessable via the filesystem path `/s3/mogreps-uk/`.

Inspecting the data set reveals:

Models run at 03 and 09

Is a lagged ensamble:
03 run has realisations/members 00 - 11
09 run has realisations/members 00 and 12 - 22

Forecast period:
+0 to +36
Split in to files with three one hour time steps per file except the first which has four one hour time steps.


model run date:
20130101 - 20131231

## Time units

for var_name == 'air_temperature'
standard_name='time', units=Unit('hours since 1970-01-01 00:00:00', calendar='gregorian'), var_name='time_0' 

However we don't need you worry about calendar calcualtions if we redefine the axis 'time' as 'forecast period'.
Give the file `prods_op_mogreps-uk_20131231_03_00_003.nc` we simply extract the 'offset' from the file name (003) and the points is `range(offset, len(shape.data[0]) +1, step)` where `step` is one in this case




In [1]:
import iris
import os
import warnings
base = '/s3/mogreps-uk/'
some_files = ["prods_op_mogreps-uk_20130101_03_00_027.nc", "prods_op_mogreps-uk_20131231_03_00_003.nc", "prods_op_mogreps-uk_20130301_09_00_030.nc", "prods_op_mogreps-uk_20130301_03_00_036.nc"]

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    cubes = iris.load(os.path.join(base,some_files[0]))
cubes

[<iris 'Cube' of cloud_base_altitude_assuming_only_consider_cloud_area_fraction_greater_than_2p5_oktas / (kft) (time: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of wet_bulb_potential_temperature / (K) (time: 3; pressure: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of stratiform_snowfall_rate / (kg m-2 s-1) (time: 36; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of wet_bulb_freezing_level_altitude / (m) (time: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of cloud_area_fraction_assuming_maximum_random_overlap / (1) (time: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of air_pressure_at_sea_level / (Pa) (time: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of air_temperature / (K) (time: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of air_temperature / (K) (time: 3; grid_latitude: 548; grid_longitude: 421)>,
<iris 'Cube' of air_temperature / (K) (time: 3; grid_latitude: 548; grid_longitude: 421)

In [3]:
air_temperature = [c for c in cubes.extract('air_temperature') if c.var_name == 'air_temperature']
assert len(air_temperature) == 1
air_temperature = air_temperature[0]

print(air_temperature)

air_temperature / (K)               (time: 3; grid_latitude: 548; grid_longitude: 421)
     Dimension coordinates:
          time                           x                 -                    -
          grid_latitude                  -                 x                    -
          grid_longitude                 -                 -                    x
     Auxiliary coordinates:
          forecast_period                x                 -                    -
     Scalar coordinates:
          forecast_reference_time: 2013-01-01 03:00:00
          height: 1.5 m
     Attributes:
          Conventions: CF-1.5
          STASH: m01s03i236
          source: Data from Met Office Unified Model
          um_version: 7.9


Orignially planed to use air temp but this var is a mess with multiple vars with similar names air_temperature0, air_temperature1, air_temperature, etc. These are all slightly different some are on levels some are min/max. The var name used changes between files so we need to inspect if we want to use this var.

lets try `surface_air_pressure`

In [4]:
surface_air_pressure = [c for c in cubes.extract('surface_air_pressure')]
assert len(surface_air_pressure) == 1
surface_air_pressure = surface_air_pressure[0]

print(surface_air_pressure)

surface_air_pressure / (Pa)         (time: 3; grid_latitude: 548; grid_longitude: 421)
     Dimension coordinates:
          time                           x                 -                    -
          grid_latitude                  -                 x                    -
          grid_longitude                 -                 -                    x
     Auxiliary coordinates:
          forecast_period                x                 -                    -
     Scalar coordinates:
          forecast_reference_time: 2013-01-01 03:00:00
     Attributes:
          Conventions: CF-1.5
          STASH: m01s00i409
          source: Data from Met Office Unified Model
          um_version: 7.9


In [5]:
import cf_units

start = surface_air_pressure.coord('time').points[0]
stop = surface_air_pressure.coord('time').points[-1]

points = surface_air_pressure.coord('time').points
print("steps in time coord", set(p - points[i] for i,p in enumerate(points[1:])))
surface_air_pressure.coord('time')[-1]

cf_units.num2date([surface_air_pressure.coord('time').points[0]], 'hours since 1970-01-01 00:00:00', calendar='gregorian')


steps in time coord {1.0}


array([datetime.datetime(2013, 1, 2, 4, 0)], dtype=object)

In [None]:
for step in range(3,36+1,3):
    file = "prods_op_mogreps-uk_20130101_03_00_{:03d}.nc".format(step)
    file = os.path.join(base, file)
    print ("Step %03d" % step)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        cubes = iris.load(os.path.join(base, file))
    surface_air_pressure = [c for c in cubes.extract('surface_air_pressure') if c.var_name == 'surface_air_pressure']
    assert len(surface_air_pressure) == 1
    surface_air_pressure = surface_air_pressure[0]
         
    for i in range(len(surface_air_pressure.coord('time').points)):
        print(cf_units.num2date([surface_air_pressure.coord('time').points[i]], 'hours since 1970-01-01 00:00:00', calendar='gregorian'))

In [6]:
import datetime
from calendar import monthrange
from collections import namedtuple
MogrepsFile = namedtuple("MogrepsFile", ['filepath', 'ref_time','forecast_periods', 'realisation'])

def files_gen():
    file_tpl = "/s3/mogreps-uk/prods_op_mogreps-uk_{year:d}{month:02d}{day:02d}_{run:02d}_{realisation:02d}_{step:03d}.nc"
    year = 2013
    for month in range(1,13):
        for day in range(1,monthrange(year,month)[1] +1):
            for run in (3,9):
                realisations = range(0,11+1) if run == 9 else [0] + list(range(12, 22+1))
                for realisation in realisations:
                    for step in range(3,36+1,3):
                        ref_time = (year,month,day,run)
                        forecast_periods = list(range(step,step+3))
                        filepath = file_tpl.format(year=year, month=month, day=day, run=run, realisation=realisation, step=step)
                        yield MogrepsFile(filepath, ref_time, forecast_periods, realisation)
                    

In [7]:
import itertools

for file in itertools.islice(files_gen(), 0,1000, 101):
    print (file, "exists?", os.path.exists(file.filepath))
    
sum(1 for _ in files_gen()) # length of our generator

MogrepsFile(filepath='/s3/mogreps-uk/prods_op_mogreps-uk_20130101_03_00_003.nc', ref_time=(2013, 1, 1, 3), forecast_periods=[3, 4, 5], realisation=0) exists? True
MogrepsFile(filepath='/s3/mogreps-uk/prods_op_mogreps-uk_20130101_03_19_018.nc', ref_time=(2013, 1, 1, 3), forecast_periods=[18, 19, 20], realisation=19) exists? True
MogrepsFile(filepath='/s3/mogreps-uk/prods_op_mogreps-uk_20130101_09_04_033.nc', ref_time=(2013, 1, 1, 9), forecast_periods=[33, 34, 35], realisation=4) exists? True
MogrepsFile(filepath='/s3/mogreps-uk/prods_op_mogreps-uk_20130102_03_12_012.nc', ref_time=(2013, 1, 2, 3), forecast_periods=[12, 13, 14], realisation=12) exists? True
MogrepsFile(filepath='/s3/mogreps-uk/prods_op_mogreps-uk_20130102_03_20_027.nc', ref_time=(2013, 1, 2, 3), forecast_periods=[27, 28, 29], realisation=20) exists? True
MogrepsFile(filepath='/s3/mogreps-uk/prods_op_mogreps-uk_20130102_09_06_006.nc', ref_time=(2013, 1, 2, 9), forecast_periods=[6, 7, 8], realisation=6) exists? True
Mogreps

105120

Whilst the above is quite pleasing (a generator for all the posiable files) it's not all that helpful for making the cube. For that we need to sample all of the potential space from inside to out building and aggrigating our data

In [13]:
%%prun
import datetime
from calendar import monthrange
from collections import namedtuple
from ncwrap import CheckingNetCDFDataProxy
import dask.array as da

VAR_NAME = 'surface_air_pressure'
grid_latitude_len = 548
grid_longitude_len = 421

RunDate = namedtuple('RunDate',['year','month','day','hour'])
def runs(year):
    for month in range(1,12):
        for day in range(1,monthrange(year,month)[1] +1):
            for hour in (3,9):
                yield RunDate(year,month,day,hour)
        
def files_gen():
    file_tpl = "/s3/mogreps-uk/prods_op_mogreps-uk_{year:d}{month:02d}{day:02d}_{run:02d}_{realisation:02d}_{step:03d}.nc"
    realisations = range(0,22+1)
    
    realisation_collection = []
    for realisation in realisations:
        
        runs_collection = []
        for year, month, day, run in list(runs(2013))[:10]:
            
            timestep_collection = []
            for step in range(3,36+1,3):
                ref_time = (year,month,day,run)
                forecast_periods = list(range(step,step+3))
                if step == 3:
                    forecast_periods = list(range(step,step+4)) # First file in a run has an extra timestep

                filepath = file_tpl.format(year=year, month=month, day=day, run=run, realisation=realisation, step=step)

                shape=(len(forecast_periods), grid_latitude_len, grid_longitude_len)
                fill = 1e20
                proxy = da.ma.masked_where(
                    fill,
                    da.from_array(
                        CheckingNetCDFDataProxy(
                            shape=shape,
                            dtype='float32', # TODO: stop assuming this?!
                            path=filepath,
                            variable_name=VAR_NAME,
                            fill_value = fill
                        ), 
                        shape
                    )
                )
                timestep_collection.append(proxy)

            run_data = da.concatenate(timestep_collection,0) 
            runs_collection.append(run_data)
                
    
        realisation_data = da.stack(runs_collection, 0)
        realisation_collection.append(realisation_data)
    
    return da.stack(realisation_collection, 0)
                    
data = files_gen()
data

 

         1905430 function calls (1813636 primitive calls) in 2.070 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     2760    0.115    0.000    0.474    0.000 core.py:306(top)
     5750    0.092    0.000    0.194    0.000 core.py:251(broadcast_dimensions)
    19574    0.090    0.000    0.360    0.000 core.py:1874(normalize_chunks)
     8534    0.087    0.000    0.338    0.000 base.py:600(tokenize)
     8280    0.084    0.000    0.129    0.000 inspect.py:2832(_bind)
     8280    0.079    0.000    0.134    0.000 core.py:959(blockdims_from_blockshape)
     3014    0.068    0.000    0.356    0.000 core.py:2120(unify_chunks)
69761/22588    0.059    0.000    0.229    0.000 utils.py:409(__call__)
     2760    0.058    0.000    1.049    0.000 core.py:2193(atop)
     8534    0.056    0.000    0.177    0.000 core.py:1032(__new__)
    33880    0.048    0.000    0.053    0.000 core.py:1101(shape)
25094/11294    0.047    0.000    0.09

In [11]:
# Build a cube from our data array
from cf_units import Unit

def points_to_coord(var_name, points, units=None, long_name=None, standard_name=None):
    long_name = long_name if long_name else var_name
    return iris.coords.DimCoord(
        points=points,
        standard_name=standard_name,
        long_name=long_name if long_name else var_name,
        var_name=var_name, units=units)



realisation = points_to_coord('realisation', list(range(data.shape[0])))
forecast_refrance = points_to_coord('forecast_refrance_time', list(range(3, data.shape[1]*12, 12)), units=Unit('hours since 2013-01-01', calendar='gregorian'))
forecast_period = points_to_coord('forecast_period', list(range(0,37)), units=Unit('hours'))
grid_latitude = surface_air_pressure.coord('grid_latitude')
grid_longitude = surface_air_pressure.coord('grid_longitude')


dim_coords = [
    realisation,
    forecast_refrance,
    forecast_period,
    grid_latitude,
    grid_longitude
]

cube = iris.cube.Cube(
        data=data,
        standard_name=VAR_NAME,
        long_name=VAR_NAME,
        var_name=VAR_NAME,
        units = Unit('Pa'),
        dim_coords_and_dims=[(coord, i)for i, coord in enumerate(dim_coords)])
print(cube)

surface_air_pressure / (Pa)         (realisation: 2; forecast_refrance_time: 62; forecast_period: 37; grid_latitude: 548; grid_longitude: 421)
     Dimension coordinates:
          realisation                           x                          -                    -                  -                    -
          forecast_refrance_time                -                          x                    -                  -                    -
          forecast_period                       -                          -                    x                  -                    -
          grid_latitude                         -                          -                    -                  x                    -
          grid_longitude                        -                          -                    -                  -                    x


In [19]:
cube[:,1,9,10,:].data

__getitem__ (prods_op_mogreps-uk_20130101_09_15_009.nc:surface_air_pressure) (slice(0, 3, None), slice(0, 548, None), slice(0, 421, None))
__getitem__ (prods_op_mogreps-uk_20130101_09_01_009.nc:surface_air_pressure) (slice(0, 3, None), slice(0, 548, None), slice(0, 421, None))
Returning null data (prods_op_mogreps-uk_20130101_09_15_009.nc:surface_air_pressure):(slice(0, 3, None), slice(0, 548, None), slice(0, 421, None)) because no such file /s3/mogreps-uk/prods_op_mogreps-uk_20130101_09_15_009.nc


masked_array(
  data=[[--, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --],
        [--, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --, --, --, --, --, --, --, --, --, --, --, --, --,
         --, --, --, --]],
  mask=[[ True,  True,  True,  True,  True,  True,  True,

In [None]:
!ls /s3/mogreps-uk/prods_op_mogreps-uk_20130101_09_16_003.nc

In [16]:
from functools import reduce

def human_bytes(num, suffix='B'):
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, 'Yi', suffix)

def estimate_cube_size(cube):
    import functools
    import operator
    num_points = functools.reduce(operator.mul, cube.shape, 1)
#     if cube[(0,) * len(cube.shape)].data.dtype != 'float32':
#         return False
    return human_bytes((num_points * 32) / 8)

estimate_cube_size(data)

'488.6GiB'

In [None]:
!ls /s3/mogreps-uk/prods_op_mogreps-uk_20130102_09_0

In [None]:
cubes = iris.load(os.path.join(base, 'prods_op_mogreps-uk_20130101_09_01_003.nc'), 'air_temperature')

In [None]:
[print(c.var_name,c) for c in cubes]

In [None]:
print(cubes[3].coord('pressure'))

In [None]:
import numpy as np

class Data():
    def __init__(self,shape):
        self.shape = shape
        self.dtype = 'float32'
        
    def __getitem__(self,keys):
        print('get data')
        return np.zeros(self.shape)[keys]
    
d = Data((1,2,3))

In [None]:
da.ma.masked_equal(d, 1e20)