# LLC4320 Global Rotarty Spectrum

In [1]:
import dask

In [126]:
from dask.distributed import Client

client = Client("tcp://10.32.78.25:44015")
client

0,1
Client  Scheduler: tcp://10.32.78.25:44015  Dashboard: /user/0000-0001-5999-4917/proxy/8787/status,Cluster  Workers: 5  Cores: 10  Memory: 57.50 GB


In [2]:
dask.config.config

{'distributed': {'logging': {'bokeh': 'critical'},
  'dashboard': {'link': '/user/{JUPYTERHUB_USER}/proxy/{port}/status'},
  'admin': {'tick': {'limit': '5s'}}},
 'kubernetes': {'name': 'dask-{JUPYTERHUB_USER}-{uuid}',
  'count': {'max': 30},
  'worker-template': {'metadata': None,
   'spec': {'restartPolicy': 'Never',
    'containers': [{'args': ['dask-worker',
       '--nthreads',
       '2',
       '--no-bokeh',
       '--memory-limit',
       '11.5GB',
       '--death-timeout',
       '60'],
      'image': '${JUPYTER_IMAGE_SPEC}',
      'name': 'dask-${JUPYTERHUB_USER}',
      'resources': {'limits': {'cpu': 16, 'memory': '11.5G'},
       'requests': {'cpu': 1, 'memory': '11.5G'}}}]}}},
 'labextension': {'factory': {'module': 'dask_kubernetes',
   'class': 'KubeCluster',
   'args': [],
   'kwargs': {}}},
 'temporary-directory': None,
 'array': {'svg': {'size': 120}}}

In [4]:
import numpy as np
import xarray as xr
from matplotlib import pyplot as plt
import xgcm
import xrft
%matplotlib inline

### Create and Connect to Dask Cluster

I am using a very big cluster to make it go fast.

### Open and Merge Data

In [105]:
import intake
cat = intake.Catalog('https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean/llc4320.yaml')
ds_grid = cat.LLC4320_grid.to_dask().reset_coords()
ds_grid = ds_grid.drop(['k_p1','Zp1', 'Zl', 'PHrefF', 'drC'])
ds_U = cat.LLC4320_SSU(chunks=None).to_dask()
ds_V = cat.LLC4320_SSV(chunks=None).to_dask()

In [106]:
ds = xr.merge([ds_U, ds_V])
ds

<xarray.Dataset>
Dimensions:  (face: 13, i: 4320, i_g: 4320, j: 4320, j_g: 4320, time: 9030)
Coordinates:
  * face     (face) int64 0 1 2 3 4 5 6 7 8 9 10 11 12
  * i_g      (i_g) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * j        (j) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * time     (time) datetime64[ns] 2011-09-13 ... 2012-09-23T05:00:00
  * i        (i) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * j_g      (j_g) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
Data variables:
    U        (time, face, j, i_g) float32 ...
    V        (time, face, j_g, i) float32 ...

In [107]:
# slice in time first and then chunk in order to avoid enormous graph
NT = 2 # vary up to 9030 to make problem bigger
ds = ds.isel(time=slice(0, NT)).chunk({'time': 1, 'face': 1})
ds

<xarray.Dataset>
Dimensions:  (face: 13, i: 4320, i_g: 4320, j: 4320, j_g: 4320, time: 2)
Coordinates:
  * face     (face) int64 0 1 2 3 4 5 6 7 8 9 10 11 12
  * i_g      (i_g) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * j        (j) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * time     (time) datetime64[ns] 2011-09-13 2011-09-13T01:00:00
  * i        (i) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * j_g      (j_g) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
Data variables:
    U        (time, face, j, i_g) float32 dask.array<chunksize=(1, 1, 4320, 4320), meta=np.ndarray>
    V        (time, face, j_g, i) float32 dask.array<chunksize=(1, 1, 4320, 4320), meta=np.ndarray>

In [108]:
from xmitgcm.llcreader.llcmodel import faces_dataset_to_latlon

# rechunk to full latlon faces (huge!)
chunks = {'i': -1, 'j': -1, 'i_g': -1, 'j_g': -1}

ds_ll = faces_dataset_to_latlon(ds, metric_vector_pairs=[]).chunk(chunks)
ds_ll
ds_grid_ll = faces_dataset_to_latlon(ds_grid).chunk(chunks)

In [109]:
ds_ll.U.data

Unnamed: 0,Array,Chunk
Bytes,1.79 GB,895.80 MB
Shape,"(2, 12960, 17280)","(1, 12960, 17280)"
Count,149 Tasks,2 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.79 GB 895.80 MB Shape (2, 12960, 17280) (1, 12960, 17280) Count 149 Tasks 2 Chunks Type float32 numpy.ndarray",17280  12960  2,

Unnamed: 0,Array,Chunk
Bytes,1.79 GB,895.80 MB
Shape,"(2, 12960, 17280)","(1, 12960, 17280)"
Count,149 Tasks,2 Chunks
Type,float32,numpy.ndarray


In [110]:
from numba import jit, guvectorize

@jit(nopython=True)
def diff_x_right(f):
    ny, nx = f.shape
    g = np.ones_like(f)
    for j in range(ny):
        j_next = j
        for i in range(nx):
            i_next = (i + 1) % nx
            g[j, i] = -f[j, i] + f[j_next, i_next]
    return g

@jit(nopython=True)
def diff_y_right(f):
    ny, nx = f.shape
    g = np.ones_like(f)
    for j in range(ny):
        j_next = (j + 1) % ny
        for i in range(nx):
            i_next = i
            g[j, i] = -f[j, i] + f[j_next, i_next]
    return g

@jit(nopython=True)
def diff_x_left(f):
    ny, nx = f.shape
    g = np.ones_like(f)
    for j in range(ny):
        j_next = j
        for i in range(nx):
            i_next = (i - 1) % nx
            g[j, i] = f[j, i] - f[j_next, i_next]
    return g

@jit(nopython=True)
def diff_y_left(f):
    ny, nx = f.shape
    g = np.ones_like(f)
    for j in range(ny):
        j_next = (j - 1) % ny
        for i in range(nx):
            i_next = i
            g[j, i] = f[j, i] - f[j_next, i_next]
    return g

sig = "Tuple((float32[:,:], float32[:,:]))(float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],int64,float32[:,:],int64,int64)"
@jit(sig, nopython=True)
def smooth_numba(U, V, dxF, dxV, dyF, dyU, rAw, rAs, c11, c12, c21, c22, DT):

    tau_11 = c11 * dxF**-1 * diff_x_right(U)
    tau_12 = c12 * dyU**-1 * diff_y_left(U)
    tau_21 = c21 * dxV**-1 * diff_x_left(V)
    tau_22 = c22 * dxF**-1 * diff_y_right(V)

    # we neglect land completely, as Hussein recommends
    G_u = (diff_x_left(dyF * tau_11)
           + diff_x_right(dxV * tau_12)) / rAw
    G_v = (diff_y_right(dyU * tau_21)
           + diff_y_left(dxF * tau_22)) / rAs

    # step forward
    U_sm = U + DT * G_u
    V_sm = V + DT * G_v
    
    return U_sm, V_sm

@jit(nopython=True)
def iterate_numba(U, V, dxF, dxV, dyF, dyU, rAw, rAs, c11, c12, c21, c22, DT, niters):
    for n in range(niters):
        U, V = smooth_numba(U, V, dxF, dxV, dyF, dyU, rAw, rAs, c11, c12, c21, c22, DT)
    return U, V

In [120]:
# https://github.com/numba/numba/issues/4314   
    
def cp_guvectorize(*args, **kwargs):
    """Same as :func:`numba.guvectorize`, but can be used to decorate dynamically
    defined function and then pickle them with
    `cloudpickle <https://pypi.org/project/cloudpickle/>`_.
    On the other hand, it can't be called from another jit-compiled function.
    """

    def decorator(func):
        return _PickleableGUVectorized(func, args, kwargs)

    return decorator


class _PickleableGUVectorized:
    def __init__(self, func, guvectorize_args, guvectorize_kwargs):
        self.args = func, guvectorize_args, guvectorize_kwargs
        decorator = guvectorize(*guvectorize_args, **guvectorize_kwargs)
        self.ufunc = decorator(func)

    def __reduce__(self):
        return _PickleableGUVectorized, self.args

    def __call__(self, *args, **kwargs):
        return self.ufunc(*args, **kwargs)

In [121]:
# sig for smooth numba
ftylist = "(float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],float32[:,:],int64,float32[:,:],int64,int64,float32[:,:],float32[:,:])"

ufunc_sig = 8 * '(j, i), ' + '(j, i), (), (j, i), (), () -> (j, i), (j, i)'

@cp_guvectorize(ftylist, ufunc_sig)
def smooth_gu(U, V, dxF, dxV, dyF, dyU, rAw, rAs, c11, c12, c21, c22, DT, U_out, V_out):
    U_out[:], V_out[:] = smooth_numba(U, V, dxF, dxV, dyF, dyU, rAw, rAs, c11, c12, c21, c22, DT)

In [122]:
# construct missing intermediate metrics
dxC = ds_grid_ll.dxC
dxG = ds_grid_ll.dxG
dxF = grid.interp(dxC, 'X').chunk({'i': -1, 'j': -1})
dxV = grid.interp(dxG, 'X').chunk({'i_g': -1, 'j_g': -1})
dyC = ds_grid_ll.dyC
dyG = ds_grid_ll.dyG
dyF = grid.interp(dyC, 'Y', boundary='extend').chunk({'i': -1, 'j': -1})
dyU = grid.interp(dyG, 'Y', boundary='extend').chunk({'i_g': -1, 'j_g': -1})

# coefficients
Ah = 1
c11 = np.cos(np.deg2rad(ds_grid_ll.YC))**(3/2)
c21 = np.cos(np.deg2rad(ds_grid_ll.YG))**(3/2)
c12 = c22 = 1

In [123]:
L = ds_grid_ll.dyC.max().values.item()
DT = 0.05 * L**2

In [114]:
args = [a.data if isinstance(a, xr.DataArray) else a for a in
        [ds_ll.U, ds_ll.U, dxF, dxV, dyF, dyU,
         ds_grid_ll.rAw, ds_grid_ll.rAs,
         c11, c12, c21, c22, DT]
        ]
args

[dask.array<rechunk-merge, shape=(2, 12960, 17280), dtype=float32, chunksize=(1, 12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(2, 12960, 17280), dtype=float32, chunksize=(1, 12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(12960, 17280), dtype=float32, chunksize=(12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(12960, 17280), dtype=float32, chunksize=(12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(12960, 17280), dtype=float32, chunksize=(12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(12960, 17280), dtype=float32, chunksize=(12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(12960, 17280), dtype=float32, chunksize=(12960, 17280), chunktype=numpy.ndarray>,
 dask.array<rechunk-merge, shape=(12960, 17280), dtype=float32, chunksize=(12960, 17280), chunktype=numpy.ndarray>,
 dask.array<pow, shape=(12960, 17280), dtype=float32, chunks

In [127]:
U_sm, V_sm = smooth_gu(*args, output_dtypes=('f4', 'f4'))
U_sm

Unnamed: 0,Array,Chunk
Bytes,1.79 GB,895.80 MB
Shape,"(2, 12960, 17280)","(1, 12960, 17280)"
Count,980 Tasks,2 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.79 GB 895.80 MB Shape (2, 12960, 17280) (1, 12960, 17280) Count 980 Tasks 2 Chunks Type float32 numpy.ndarray",17280  12960  2,

Unnamed: 0,Array,Chunk
Bytes,1.79 GB,895.80 MB
Shape,"(2, 12960, 17280)","(1, 12960, 17280)"
Count,980 Tasks,2 Chunks
Type,float32,numpy.ndarray


In [128]:
U_sm.mean().compute()

KilledWorker: ("('smooth_gu-smooth_gu_0-mean_chunk-589841b92d1be9ce8d81b741477f1c6e', 1, 0, 0)", <Worker 'tcp://10.32.4.5:43307', memory: 0, processing: 1>)