### Compute and store filtered and demodulated velocity fields at each grid point

In [1]:
import numpy as np
import geopandas as gpd

import xarray as xr
from matplotlib import pyplot as plt
%matplotlib inline

from xhistogram.xarray import histogram
import dask.dataframe as dd
#import cartopy.crs as ccrs
#import cartopy.feature as cfeature

import mitequinox.utils as ut
from mitequinox.plot import *
import mitequinox.parcels as pa
from xmitgcm import llcreader

from scipy import signal
import scipy.ndimage as im

from sympy import Symbol, pi, atan, factor, lambdify

import mitequinox.plot as pl
import mitequinox.sigp as sp

from fsspec.implementations.local import LocalFileSystem

In [2]:
from dask.distributed import Client, LocalCluster
#
#cluster = LocalCluster()
#
from dask_jobqueue import PBSCluster
cluster = PBSCluster(processes=5, cores=5) #processes=7, cores=7
w = cluster.scale(jobs=10
                 )
client = Client(cluster)

  from distributed.utils import tmpfile


In [3]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.148.0.28:8787/status,

0,1
Dashboard: http://10.148.0.28:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.148.0.28:52088,Workers: 0
Dashboard: http://10.148.0.28:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [3]:
def convolve(x, h=None, hilbert=False):
    """ Convolve an input signal with a kernel
    Optionaly compute the Hilbert transform of the resulting time series
    
    Parameters
    x : input signal
    h : filter 
    hilbert : True for Hilbert transform to be applied to the filtered signal
    
    Returns
    x_f : filtered signal or hilbert transform of the filtered signal
    """
    x_f = signal.filtfilt(h, [1], x, axis=-1,padlen=0)#
    if hilbert:
        return signal.hilbert(x_f)
    else:
        return x_f

def filt(v, h, hilbert=False):
    
    output_dtype = complex if hilbert else float
    gufunc_kwargs = dict(output_sizes={'time': len(v.time)})
    return xr.apply_ufunc(convolve, v, kwargs={'h': h, 'hilbert': hilbert},
                    dask='parallelized', output_dtypes=[output_dtype],
                    input_core_dims=[['time']],
                    output_core_dims=[['time']],
                    dask_gufunc_kwargs = gufunc_kwargs,
                         )

##### First step : Filter and demodulate for each face

In [4]:
# Load Eulerian fields
ds = xr.open_zarr(ut.work_data_dir+'rechunked/SSV_rot.zarr')
grd = ut.load_grd()[['XC', 'YC', 'Depth']]#.persist()

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = xr.open_zarr(ut.work_data_dir+'rechunked/SSV_rot.zarr')
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = xr.open_zarr(ref_data_dir + "grid.zarr", **kwargs)


In [5]:
#isel=dict(face=12) #face
#V = ['SSV_rot']#'zonal_velocity','meridional_velocity'
path = '/home1/datawork/zcaspar/mit4320/filtered_itide/'
dsel = 200

In [6]:
dt = 1/24 # time step in days

tidal_omega = sp.get_tidal_frequencies("M2", "K2","S2","N2")
omega_M2,omega_S2,omega_N2,omega_K2, domega, name = tidal_omega["M2"],tidal_omega["S2"],tidal_omega["N2"],tidal_omega["K2"], .2, "semidiurnal"
omega = (omega_M2+omega_S2)/2#center frequency
Tw = 30 #filter length
dband = 0.2 # half-bandwidth
V = ['SSV_rot']

In [59]:
#dt = 1. # in hours
#T = 20
#omega = 1/30
#h = signal.firwin(T*24, cutoff=[omega], pass_zero=True, nyq=1./2/dt, scale=True)
#V = 'SSU_rot'

In [7]:
import dask
def wrap_filter(_ds,dt=dt):
    h = sp.generate_filter(om, T=Tw, dt=dt, bandwidth=dband, normalized_bandwidth=None)
    time = np.arange(0,_ds['time'].values.size*dt,dt)
    exp = np.exp(-1j*om*2*np.pi*time)
#    _ds = _ds.chunk(dict(time=16,i=100,j=100))
    ds_hat={}
    for v in V:
        ds_hat[v+'_hat'] = filt(_ds[v], h,hilbert=False)#.persist()
#        ds_hat[v+'_hat'] = ds_hat[v+'_hat'].assign_coords({'lon':ds_hat[v+'_hat'].lon,'lat':ds_hat[v+'_hat'].lat})
        ds_hat[v+'_demodulated'] = ds_hat[v+'_hat']*exp
    ds_hat = xr.merge([ds_hat[v].rename(v) for v in ds_hat.keys()])
#    ds_hat = ds_hat.chunk(dict(i=4320//4,j=4320//4))
#    ds_hat = ut._reset_chunk_encoding(ds_hat)
    return ds_hat#.drop(['XC','YC'])

def wrap_filter_low(_ds,dt=dt):
    h = signal.firwin(T*24, cutoff=[omega], pass_zero=True, nyq=1./2/dt, scale=True)
    ds_hat = filt(_ds[v], h,hilbert=False)#.persist()
    ds_hat = ds_hat.rename(v+'_hat').to_dataset()
#    ds_hat = ds_hat.chunk(dict(i=4320//4,j=4320//4))
#    ds_hat = ut._reset_chunk_encoding(ds_hat)
    return ds_hat

In [8]:
#Select and rechunk (empirical) original dataset
isel = dict(j=slice(0,500))
dsp = ds.isel(i=slice(0,None,4),j=slice(0,None,4))#.isel(face=0)#.sel(**sel)#.persist()
dsp = dsp.chunk({"time": -1})#.persist()#,'i':100,'j':70

In [9]:
om,Tw,dband,V = omega,Tw,dband,V #define parameters needed in wrap_filter

In [84]:
v = 'SSU_rot'
dsp = dsp.isel(face=0)

In [10]:
# Apply wrap_filter on the dataset and store the result for each face (seperated)
import os
#zarr_main = os.path.join(ut.root_data_dir, "filtered_itide/SSU_filtered_face4.zarr")
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ds_out, zarr = ut.custom_distribute(dsp, 
                                        wrap_filter,
                                        overwrite=True,
                                        suffix="SSV_filtered.zarr",
                                        tmp_dir=os.path.join(ut.root_data_dir, "filtered_itide"),
                                        append=True,
                                        face=1
#                                        j=200
                                       )

13it [27:45, 128.15s/it]


In [38]:
ds_out

Unnamed: 0,Array,Chunk
Bytes,2.35 TiB,26.37 MiB
Shape,"(4320, 4320, 8640)","(100, 1080, 16)"
Count,95041 Tasks,95040 Chunks
Type,complex128,numpy.ndarray
"Array Chunk Bytes 2.35 TiB 26.37 MiB Shape (4320, 4320, 8640) (100, 1080, 16) Count 95041 Tasks 95040 Chunks Type complex128 numpy.ndarray",8640  4320  4320,

Unnamed: 0,Array,Chunk
Bytes,2.35 TiB,26.37 MiB
Shape,"(4320, 4320, 8640)","(100, 1080, 16)"
Count,95041 Tasks,95040 Chunks
Type,complex128,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.35 TiB,26.37 MiB
Shape,"(4320, 4320, 8640)","(100, 1080, 16)"
Count,95041 Tasks,95040 Chunks
Type,complex128,numpy.ndarray
"Array Chunk Bytes 2.35 TiB 26.37 MiB Shape (4320, 4320, 8640) (100, 1080, 16) Count 95041 Tasks 95040 Chunks Type complex128 numpy.ndarray",8640  4320  4320,

Unnamed: 0,Array,Chunk
Bytes,2.35 TiB,26.37 MiB
Shape,"(4320, 4320, 8640)","(100, 1080, 16)"
Count,95041 Tasks,95040 Chunks
Type,complex128,numpy.ndarray


In [44]:
#client.restart()
cluster.close()

##### Second step : Concat filtered and demodulated fields for each face

In [30]:
from dask.distributed import Client, LocalCluster
#
#cluster = LocalCluster()

#
from dask_jobqueue import PBSCluster
cluster = PBSCluster(walltime='04:00:00') #processes=7, cores=7 ,walltime='03:00:00'
w = cluster.scale(jobs=10
                 )
client = Client(cluster)

In [31]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.148.0.32:8787/status,

0,1
Dashboard: http://10.148.0.32:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.148.0.32:38094,Workers: 0
Dashboard: http://10.148.0.32:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [32]:
path = '/home/datawork-lops-osi/equinox/mit4320/filtered_itide/'

#ds = xr.concat([xr.open_zarr(ut.work_data_dir+'filtered_itide/SSV_filtered_face%s.zarr'%i
#                           ) for i in np.arange(11,12)],dim='face')#.persist()
ds = xr.concat([xr.open_zarr(ut.work_data_dir+'filtered_itide/SSV_filtered_%s.zarr'%i
                           ) for i in np.arange(0,5)],dim='face')
#ds = xr.concat([xr.open_zarr(ut.work_data_dir+'filtered_itide/SSU_filtered_%s.zarr'%i
#                           ) for i in ['00','01']],dim='face')

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = xr.concat([xr.open_zarr(ut.work_data_dir+'filtered_itide/SSV_filtered_%s.zarr'%i
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  ds = xr.concat([xr.open_zarr(ut.work_data_dir+'filtered_itide/SSV_filtered_%s.zarr'%i
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting cons

In [33]:
ds

Unnamed: 0,Array,Chunk
Bytes,28.16 TiB,18.46 MiB
Shape,"(12, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,3214085 Tasks,1607040 Chunks
Type,complex128,numpy.ndarray
"Array Chunk Bytes 28.16 TiB 18.46 MiB Shape (12, 4320, 4320, 8640) (1, 70, 1080, 16) Count 3214085 Tasks 1607040 Chunks Type complex128 numpy.ndarray",12  1  8640  4320  4320,

Unnamed: 0,Array,Chunk
Bytes,28.16 TiB,18.46 MiB
Shape,"(12, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,3214085 Tasks,1607040 Chunks
Type,complex128,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.16 TiB,18.46 MiB
Shape,"(12, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,3214085 Tasks,1607040 Chunks
Type,complex128,numpy.ndarray
"Array Chunk Bytes 28.16 TiB 18.46 MiB Shape (12, 4320, 4320, 8640) (1, 70, 1080, 16) Count 3214085 Tasks 1607040 Chunks Type complex128 numpy.ndarray",12  1  8640  4320  4320,

Unnamed: 0,Array,Chunk
Bytes,28.16 TiB,18.46 MiB
Shape,"(12, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,3214085 Tasks,1607040 Chunks
Type,complex128,numpy.ndarray


In [34]:
def concat(ds):
#    _ds = xr.concat([xr.open_zarr(ut.work_data_dir+'filtered_itide/SSV_filtered_%s.zarr'%i
#                           ) for i in np.arange(0,5)],dim='face')
#    _ds = ds#xr.concat(ds,dim='face')
    _ds = ds.chunk(dict(time=16,i=4320//4
                        )) #,j=4320//4
#    _ds = ut._reset_chunk_encoding(_ds)
    return _ds

In [6]:
ds = ds.persist()

In [35]:
#Apply concat function on ds
face = 1
import os, dask
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ds_out, zarr = ut.custom_distribute(ds, 
                                        concat,
                                        overwrite=True,
                                        suffix="SSV_filtered.zarr",
                                        tmp_dir=os.path.join(ut.root_data_dir, "filtered_itide"),
                                        append=True,
#                                        face=face
                                        j=20
                                       )

1it [06:30, 390.17s/it]Exception ignored in: <bound method GCDiagnosis._gc_callback of <distributed.utils_perf.GCDiagnosis object at 0x2aabc4b21a30>>
Traceback (most recent call last):
  File "/home1/datahome/zcaspar/miniconda3/envs/croco/lib/python3.8/site-packages/distributed/utils_perf.py", line 182, in _gc_callback
    def _gc_callback(self, phase, info):
KeyboardInterrupt: 
Exception ignored in: <bound method GCDiagnosis._gc_callback of <distributed.utils_perf.GCDiagnosis object at 0x2aabc4b21a30>>
Traceback (most recent call last):
  File "/home1/datahome/zcaspar/miniconda3/envs/croco/lib/python3.8/site-packages/distributed/utils_perf.py", line 192, in _gc_callback
    self._fractional_timer.start_timing()
  File "/home1/datahome/zcaspar/miniconda3/envs/croco/lib/python3.8/site-packages/distributed/utils_perf.py", line 115, in start_timing
    assert self._cur_start is None
AssertionError: 
1it [06:55, 415.72s/it]


KeyboardInterrupt: 

In [7]:
ds_out

Unnamed: 0,Array,Chunk
Bytes,7.04 TiB,18.46 MiB
Shape,"(3, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,401761 Tasks,401760 Chunks
Type,complex128,numpy.ndarray
"Array Chunk Bytes 7.04 TiB 18.46 MiB Shape (3, 4320, 4320, 8640) (1, 70, 1080, 16) Count 401761 Tasks 401760 Chunks Type complex128 numpy.ndarray",3  1  8640  4320  4320,

Unnamed: 0,Array,Chunk
Bytes,7.04 TiB,18.46 MiB
Shape,"(3, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,401761 Tasks,401760 Chunks
Type,complex128,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.04 TiB,18.46 MiB
Shape,"(3, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,401761 Tasks,401760 Chunks
Type,complex128,numpy.ndarray
"Array Chunk Bytes 7.04 TiB 18.46 MiB Shape (3, 4320, 4320, 8640) (1, 70, 1080, 16) Count 401761 Tasks 401760 Chunks Type complex128 numpy.ndarray",3  1  8640  4320  4320,

Unnamed: 0,Array,Chunk
Bytes,7.04 TiB,18.46 MiB
Shape,"(3, 4320, 4320, 8640)","(1, 70, 1080, 16)"
Count,401761 Tasks,401760 Chunks
Type,complex128,numpy.ndarray


In [11]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
