In [1]:
import os, sys
from glob import glob
import numpy as np
import dask
import xarray as xr
import cartopy.crs as ccrs
from matplotlib import pyplot as plt
%matplotlib inline

from mitequinox.utils import *
from mitequinox.sigp import *

In [2]:
from dask_jobqueue import PBSCluster
# for heavy processing:
#cluster = PBSCluster(cores=6, processes=6, walltime='06:00:00')
#w = cluster.scale(6*10)

cluster = PBSCluster(cores=6, processes=6,  walltime='06:00:00')
w = cluster.scale(12*10)

In [3]:
# get dask handles and check dask server status
from dask.distributed import Client
client = Client(cluster)

In [4]:
client

0,1
Client  Scheduler: tcp://10.135.39.27:41390  Dashboard: http://10.135.39.27:8787/status,Cluster  Workers: 120  Cores: 120  Memory: 2.00 TB


____________
# Calculating total-uv, ageo-uv and geo-uv

## Notes on 2020.03.24; added on 2020.09.20
### 1. Total-uv is the modelled uv.
### 2. Geo-uv is the one estimated from geostrophic balance (in terms of Eta gradients and Coriolis force).
### 3. Ageo-uv is the difference between modelled uv and geo uv.
### 4. These results (u,u_g,u_a; v,v_g,v_a) (1080*1080*13) are stored at /work/ALT/swot/aval/syn/xy/comparison
### 5. each face may need 3-5 mins to calculate and store, under cluster = PBSCluster(cores=6, processes=6,  walltime='06:00:00'); w = cluster.scale(12*10)

In [6]:
#grd = load_grd(ftype='nc').reset_coords()
grd = load_grd().reset_coords()
mask = ((grd.hFacW.rename({'i_g': 'i'}) == 1) &
        (grd.hFacS.rename({'j_g': 'j'}) == 1) 
       ).rename('mask').reset_coords(drop=True)
grd_rspec = xr.merge([mask, grd.XC, grd.YC, grd.Depth])
print(grd_rspec)

# !! chunking is coarse for the netcdf grid

<xarray.Dataset>
Dimensions:  (face: 13, i: 4320, j: 4320)
Coordinates:
  * face     (face) int64 0 1 2 3 4 5 6 7 8 9 10 11 12
  * i        (i) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * j        (j) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
Data variables:
    mask     (face, j, i) bool dask.array<chunksize=(1, 4320, 4320), meta=np.ndarray>
    XC       (face, j, i) float32 dask.array<chunksize=(1, 4320, 4320), meta=np.ndarray>
    YC       (face, j, i) float32 dask.array<chunksize=(1, 4320, 4320), meta=np.ndarray>
    Depth    (face, j, i) float32 dask.array<chunksize=(1, 4320, 4320), meta=np.ndarray>


In [7]:
# coriolis term
lat = grd_rspec['YC']
omega = 7.3/100000
f_ij = 2*omega*np.sin(np.deg2rad(lat))
f_ij

<xarray.DataArray 'YC' (face: 13, j: 4320, i: 4320)>
dask.array<mul, shape=(13, 4320, 4320), dtype=float32, chunksize=(1, 4320, 4320), chunktype=numpy.ndarray>
Coordinates:
  * face     (face) int64 0 1 2 3 4 5 6 7 8 9 10 11 12
  * i        (i) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319
  * j        (j) int64 0 1 2 3 4 5 6 7 ... 4313 4314 4315 4316 4317 4318 4319

In [8]:
# u, geo_u, ageo_u

dij=4
overwrite=True
    
#for face in range(13):
for face in [1]:

    Efile = work_data_dir+'xy/comparison/u_try_f%02d.zarr'%(face)

    if not os.path.isdir(Efile) or overwrite:

        # load data
        dsu = ( xr.open_zarr(work_data_dir+'rechunked/%s_mbal_xy_f%02d.zarr'%('u',face))
                .isel(i_g=slice(0,None,dij), j=slice(0,None,dij)) )
        dsv = (xr.open_zarr(work_data_dir+'rechunked/%s_mbal_xy_f%02d.zarr'%('v',face))
               .isel(i=slice(0,None,dij), j_g=slice(0,None,dij)) )
        fs = f_ij.isel(face=face, i=slice(0,None,dij), j=slice(0,None,dij))
        
        ds = xr.merge([dsu.rename({'i_g': 'i'}), dsv.rename({'j_g': 'j'})], 
                      compat='equals').assign_coords(**grd_rspec.sel(face=face))

        # !!! check signs (checked on 08/30), will need to be normalized by Coriolis frequency !!!
        
        # compute ageostrophic velocities
        ds['u_ageo'] =  (ds['v_gradp'] -  ds['v_coriolis_linear'])/fs
        ds['u_geo'] =  (-ds['v_gradp'])/fs
        ds['u'] =  -ds['v_coriolis_linear']/fs
        E = xr.merge([ds['u_ageo'],ds['u_geo'],ds['u']])
        print(E)
        #ds['v_ageo'] =  (-ds['u_gradp'] +  ds['u_coriolis_linear'])/fs 
        #ds['v_geo'] =  (ds['u_gradp'])/fs
        #ds['v'] =  ds['u_coriolis_linear']/fs 
        #E = xr.merge([ds['v_ageo'],ds['v_geo'],ds['v']])
        
        # store
        for c in E.coords:
            try:
                del E[c].encoding['chunks']
            except:
                print(c)

        E = E.chunk({'i': 24*8, 'j':47*4})
        print(E)
        %time E.to_zarr(Efile, mode='w')
        print('--- face %d done'%face)

    else:
        print('--- face %d allready computed'%face)

<xarray.Dataset>
Dimensions:  (i: 1080, j: 1080, time: 8640)
Coordinates:
    dxC      (j, i) float32 dask.array<chunksize=(24, 47), meta=np.ndarray>
    dyG      (j, i) float32 dask.array<chunksize=(24, 47), meta=np.ndarray>
    f_j      (j, i) float32 dask.array<chunksize=(24, 47), meta=np.ndarray>
    face     int64 1
  * i        (i) int64 0 4 8 12 16 20 24 ... 4292 4296 4300 4304 4308 4312 4316
  * j        (j) int64 0 4 8 12 16 20 24 ... 4292 4296 4300 4304 4308 4312 4316
  * time     (time) float64 5.702e+06 5.706e+06 5.71e+06 ... 3.68e+07 3.68e+07
    dxG      (j, i) float32 dask.array<chunksize=(24, 47), meta=np.ndarray>
    dyC      (j, i) float32 dask.array<chunksize=(24, 47), meta=np.ndarray>
    f_i      (j, i) float32 dask.array<chunksize=(24, 47), meta=np.ndarray>
    Depth    (j, i) float32 dask.array<chunksize=(1080, 1080), meta=np.ndarray>
    XC       (j, i) float32 dask.array<chunksize=(1080, 1080), meta=np.ndarray>
    YC       (j, i) float32 dask.array<chunksize=(

In [37]:
# v, geo_v, ageo_v

dij=4
overwrite=True
    
for face in range(13):
#for face in [1]:

    Efile = work_data_dir+'xy/comparison/v_f%02d.zarr'%(face)

    if not os.path.isdir(Efile) or overwrite:

        # load data
        dsu = ( xr.open_zarr(work_data_dir+'rechunked/%s_mbal_xy_f%02d.zarr'%('u',face))
                .isel(i_g=slice(0,None,dij), j=slice(0,None,dij)) )
        dsv = (xr.open_zarr(work_data_dir+'rechunked/%s_mbal_xy_f%02d.zarr'%('v',face))
               .isel(i=slice(0,None,dij), j_g=slice(0,None,dij)) )
        fs = f_ij.isel(face=face, i=slice(0,None,dij), j=slice(0,None,dij))
        
        ds = xr.merge([dsu.rename({'i_g': 'i'}), dsv.rename({'j_g': 'j'})], 
                      compat='equals').assign_coords(**grd_rspec.sel(face=face))

        # !!! check signs (checked on 08/30), will need to be normalized by Coriolis frequency !!!

        ds['v_ageo'] =  (-ds['u_gradp'] +  ds['u_coriolis_linear'])/fs
        ds['v_geo'] =  (ds['u_gradp'])/fs
        ds['v'] =  ds['u_coriolis_linear']/fs 
        E = xr.merge([ds['v_ageo'],ds['v_geo'],ds['v']])

        #ds['u_ageo'] =  (ds['v_gradp'] -  ds['v_coriolis_linear'])/fs
        #ds['u_geo'] =  (-ds['v_gradp'])/fs
        #ds['u'] =  -ds['v_coriolis_linear']/fs        
        #E = xr.merge([ds['u_ageo'],ds['u_geo'],ds['u']])

        # store
        for c in E.coords:
            try:
                del E[c].encoding['chunks']
            except:
                print(c)        
        E = E.chunk({'i': 24*8, 'j':47*4})
        %time E.to_zarr(Efile, mode='w')

        print('--- face %d done'%face)

    else:
        print('--- face %d allready computed'%face)

dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 1min 15s, sys: 3.79 s, total: 1min 19s
Wall time: 1min 33s
--- face 0 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 1min 55s, sys: 8.03 s, total: 2min 3s
Wall time: 4min 59s
--- face 1 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 1min 15s, sys: 6.43 s, total: 1min 22s
Wall time: 1min 51s
--- face 2 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask




CPU times: user 1min 3s, sys: 4.25 s, total: 1min 8s
Wall time: 1min 13s
--- face 3 done




dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask




CPU times: user 1min 54s, sys: 7.43 s, total: 2min 1s
Wall time: 3min 2s
--- face 4 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 49.3 s, sys: 2.71 s, total: 52 s
Wall time: 55.9 s
--- face 5 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 1min 12s, sys: 4.42 s, total: 1min 17s
Wall time: 1min 26s
--- face 6 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask




CPU times: user 2min 2s, sys: 7.8 s, total: 2min 10s
Wall time: 3min 16s
--- face 7 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 2min 1s, sys: 9.09 s, total: 2min 10s
Wall time: 4min 5s
--- face 8 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 2min 5s, sys: 7.49 s, total: 2min 13s
Wall time: 2min 49s
--- face 9 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 2min 5s, sys: 9.04 s, total: 2min 14s
Wall time: 2min 47s
--- face 10 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 3min 8s, sys: 10.8 s, total: 3min 19s
Wall time: 4min 51s
--- face 11 done
dxC
dyG
f_j
face
dxG
dyC
f_i
Depth
XC
YC
mask
CPU times: user 2min 13s, sys: 7.84 s, total: 2min 21s
Wall time: 3min 25s
--- face 12 done




In [29]:
# Eta
time_length = 8640
dij=4
overwrite=True
    
#for face in range(13):
for face in [1]:

    Efile = work_data_dir+'xy/SSH/Eta_f%02d.zarr'%(face)

    if not os.path.isdir(Efile) or overwrite:

        # load data
        dsE = xr.open_zarr(root_data_dir+'zarr/%s.zarr'%('Eta')).isel(time=slice(1512,1512+time_length),face=face, i=slice(0,None,dij), j=slice(0,None,dij))
        
        del dsE['Eta'].encoding['chunks']
        del dsE['time'].encoding['chunks']
        #dsE = dsE.chunk({'time':time_length, 'i': 24*8, 'j':47*4})    
        
        # store
        for c in dsE.coords:
            try:
                del dsE[c].encoding['chunks']
            except:
                print(c)

        dsE = dsE.chunk({'time':time_length, 'i': 24*8, 'j':47*4}) 
        %time dsE.to_zarr(Efile, mode='w')
        print('--- face %d done'%face)

    else:
        print('--- face %d allready computed'%face)

time
CPU times: user 10.8 s, sys: 803 ms, total: 11.6 s
Wall time: 15.2 s
--- face 1 done


In [40]:
ds =  xr.open_zarr(work_data_dir+'xy/SSH/Eta_f%02d.zarr'%(face))
ds

<xarray.Dataset>
Dimensions:  (i: 1080, j: 1080, time: 8640)
Coordinates:
    dtime    (time) datetime64[ns] dask.array<chunksize=(8640,), meta=np.ndarray>
    face     int64 ...
  * i        (i) int64 0 4 8 12 16 20 24 ... 4292 4296 4300 4304 4308 4312 4316
    iters    (time) int64 dask.array<chunksize=(8640,), meta=np.ndarray>
  * j        (j) int64 0 4 8 12 16 20 24 ... 4292 4296 4300 4304 4308 4312 4316
  * time     (time) float64 5.702e+06 5.706e+06 5.71e+06 ... 3.68e+07 3.68e+07
Data variables:
    Eta      (time, j, i) float32 dask.array<chunksize=(8640, 188, 192), meta=np.ndarray>

In [42]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/home/uz/yux/.conda/envs/equinox/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/home/uz/yux/.conda/envs/equinox/lib/python3.7/site-packages/distributed/client.py", line 1276, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/home/uz/yux/.conda/envs/equinox/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/home/uz/yux/.conda/envs/equinox/lib/python3.7/site-packages/distributed/client.py", line 1005, in _reconnect
    await self._close()
  File "/home/uz/yux/.conda/envs/equinox/lib/python3.7/site-packages/distributed/client.py", line 1276, in _close
    await gen.with_timeout(timedelta(seconds=