In [1]:
import numpy as np
import xray
from matplotlib import pyplot as plt
import os
import pandas as pd
from astropy.convolution import Gaussian2DKernel, convolve

%matplotlib inline

plt.rcParams['figure.figsize'] = (12,7)
plt.rcParams['image.origin'] = 'bottom'

print xray

<module 'xray' from '/home/rpa/xray/xray/__init__.pyc'>


In [2]:
ddir = '/data/scratch/rpa/aviso/ftp.aviso.altimetry.fr/global/delayed-time/grids/msla/all-sat-merged'
ds = xray.open_mfdataset(os.path.join(ddir,'uv','all-monthly','*.nc'), engine='scipy')
ds

<xray.Dataset>
Dimensions:   (lat: 720, lon: 1440, nv: 2, time: 8031)
Coordinates:
  * lon       (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
  * lat       (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
  * nv        (nv) int32 0 1
  * time      (time) datetime64[ns] 1993-01-01 1993-01-02 1993-01-03 ...
Data variables:
    crs       (time) int32 -2147483647 -2147483647 -2147483647 -2147483647 ...
    lat_bnds  (time, lat, nv) float32 -90.0 -89.75 -89.75 -89.5 -89.5 -89.25 ...
    v         (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...
    lon_bnds  (time, lon, nv) float32 0.0 0.25 0.25 0.5 0.5 0.75 0.75 1.0 ...
    u         (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...
Attributes:
    comment: Surface product; Geostrophic Velocities referenced to the [1993, 2012] period
    time_coverage_resolution: P1D
    geospatial_vertical_max: 0.0
    product_version: 5.0
    geospatial_lat_units: degrees_north
    geos

In [3]:
ds.nbytes / 1e9

133.3635335

## Description of Calcuation ##

The original datset ds contains daily values of two velocity variables, $u$ and $v$. The calculation I want to perform has the following steps

* Apply a high pass filter to $u$ and $v$ at each time point:
    * Convolve each variable with a 2d gaussian kernel to produce a spatially smoothed field using the [astropy convolution routines](http://astropy.readthedocs.org/en/latest/convolution/), which have proper treatment of missing values.
    * Subtract the smoothed field from the original field
* calculate the kinetic energy $(u^2 + v^2)/$
* resample the kinetic energy field at monthly resolution
* save the result to a new netcdf file
 
The problem is that the astropy convolution function only operates on numpy arrays. So I tried accessing the underlying dask arrays using [dask.map_blocks](http://dask.pydata.org/en/latest/array-api.html?highlight=map_blocks#dask.array.core.Array.map_blocks). The code below works, but evidently it does not multithread (CPU usage for the process never exceeds 100%). The convolution is a very CPU-intensive operation, so I would really like to take advantage of dask multithreading.


In [3]:
FILTER_STDDEV=2

def high_pass_filter(np_ar):
    gaussian_kernel = Gaussian2DKernel(stddev=FILTER_STDDEV)
    if (np_ar.ndim > 2) and (np_ar.shape[0]>1):
        out = np.zeros_like(np_ar)
        for k in xrange(np_ar.shape[0]):
            out[k] = convolve(np_ar[k], gaussian_kernel, boundary='wrap')
        return np_ar - out
    elif (np_ar.ndim > 2):
        return np_ar - convolve(np_ar.squeeze(), gaussian_kernel, boundary='wrap')[np.newaxis,:,:]
    else:
        return np_ar - convolve(np_ar, gaussian_kernel, boundary='wrap')    
    

In [7]:
TIME_SUBSAMPLE = 5 # don't bother doing every day--there's no variance
TIME_LIMIT = 12*12

u = ds['u'][::TIME_SUBSAMPLE]
v = ds['v'][::TIME_SUBSAMPLE]

ufilt = u.data.map_blocks(high_pass_filter, dtype=np.float64)
vfilt = v.data.map_blocks(high_pass_filter, dtype=np.float64)

filtered_ds = xray.Dataset({'u':(u.dims, ufilt),
                            'v':(u.dims, vfilt)},
                              coords=u.coords )
filtered_ds

<xray.Dataset>
Dimensions:  (lat: 720, lon: 1440, time: 1607)
Coordinates:
  * lon      (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
  * time     (time) datetime64[ns] 1993-01-01 1993-01-06 1993-01-11 ...
  * lat      (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
Data variables:
    u        (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...
    v        (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...

In [9]:
# still contains the seasonal cycle eke
# (e.g. large-scale gyre shifts)
uv_clim = filtered_ds.groupby('time.month').mean('time')
uv_anom = filtered_ds.groupby('time.month') - uv_clim
uv_anom

<xray.Dataset>
Dimensions:  (lat: 720, lon: 1440, time: 1607)
Coordinates:
  * lat      (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
  * lon      (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
  * time     (time) datetime64[ns] 1993-01-01 1993-01-06 1993-01-11 ...
    month    (time) int32 1 1 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 3 4 4 4 4 4 4 5 ...
Data variables:
    v        (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...
    u        (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...

In [10]:
# 
eke = 0.5*(uv_anom.u**2 + uv_anom.v**2)
eke_monthly = eke.resample('MS', 'time')

In [16]:
eke_monthly_ds = eke_monthly.to_dataset(name='eke')
%time eke_monthly_ds.to_netcdf(os.path.join(ddir,'uv','ekea_monthly_mean_filtered-%g.nc' % FILTER_STDDEV))

CPU times: user 2h 1min 44s, sys: 1min 43s, total: 2h 3min 28s
Wall time: 4min 2s
