# Store pdf on the acceleration only and stats in zarr

In [1]:
import numpy as np
import pandas as pd
import xarray as xr
import pyproj
from rasterio.transform import Affine

import matplotlib.pyplot as plt
import matplotlib.colors as cl
from matplotlib.ticker import FormatStrFormatter

import cartopy.crs as ccrs
import cartopy.feature as cfeature
import cartopy.geodesic as cgeo
crs = ccrs.PlateCarree()
import cmocean.cm as cm

from xgcm import Grid
from xhistogram.xarray import histogram
import warnings
warnings.filterwarnings("ignore")

import os
from glob import glob

import m2lib22.box as box
import m2lib22.aviso as aviso
import m2lib22.cstes as cstes
import m2lib22.diagnosis as diag
import m2lib22.erastar as eras
import m2lib22.stress_to_windterm as stw

from m2lib22.cstes import labels, zarr_dir

In [3]:
if True:
    from dask.distributed import Client
    from dask_jobqueue import PBSCluster
    #cluster = PBSCluster(cores=56, processes=28, walltime='04:00:00')
    #cluster = PBSCluster(cores=7, processes=7, walltime='04:00:00')
    cluster = PBSCluster(cores=10, processes=10, walltime='04:00:00')
    w = cluster.scale(jobs=4)
else:
    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster()

client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.148.0.109:48284/status,

0,1
Dashboard: http://10.148.0.109:48284/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.148.0.109:38033,Workers: 0
Dashboard: http://10.148.0.109:48284/status,Total threads: 0
Started: Just now,Total memory: 0 B


## IMPORT DATA, build dataset
### FOR ALL DATA

In [None]:
# no PEACHY data
labels=labels[:2]

# only erastar
#list_wd_srce_suffix = ['es']
#_stress_var = [v for v in diag._stress_var if 'es' in v]

DS={}
for l in labels:
    #OPEN DATA FILES
    ds_data = xr.open_zarr(zarr_dir+'/'+l+'.zarr')
    ds_stress= xr.open_zarr(zarr_dir+'/erastar/erastar_'+l+'.zarr')# TO CHANGE ONCE GOOD FILES GENERATED
    ds_corr = xr.open_zarr(zarr_dir+'/slacorrection_'+l+'.zarr')
    ds_aviso = xr.open_zarr(zarr_dir+'/aviso_'+l+'.zarr')    
    #
    ds = xr.merge(diag.datasets_for_pdfs(ds_data, ds_aviso, ds_stress, ds_corr, sum_ = True, except_ = True,
                     ))
    #dt<0.5h, dl<1e5m,
    ds= ds.where(ds.alti___time_difference<=1800, drop=True)
    ds= ds.where(ds.alti___distance<=1e5, drop=True)
    DS[l]=ds.persist()
    print(l)

In [5]:
acc_bins=np.arange(-1e-4,1e-4,1e-6)
ds_pdf = xr.concat([diag.compute_pdfs(DS[key], acc_bins) for key in DS], dim=pd.Index(DS.keys(), name='drifter_sat_year'))

Task exception was never retrieved
future: <Task finished name='Task-621382' coro=<Client._gather.<locals>.wait() done, defined at /home1/datahome/mdemol/.miniconda3/envs/m2env/lib/python3.8/site-packages/distributed/client.py:2003> exception=AllExit()>
Traceback (most recent call last):
  File "/home1/datahome/mdemol/.miniconda3/envs/m2env/lib/python3.8/site-packages/distributed/client.py", line 2008, in wait
    raise AllExit()
distributed.client.AllExit
Exception ignored in: <generator object sync.<locals>.f at 0x2aab31a9d7b0>
Traceback (most recent call last):
  File "/home1/datahome/mdemol/.miniconda3/envs/m2env/lib/python3.8/site-packages/distributed/utils.py", line 349, in f
    result = yield future
KeyboardInterrupt: 


KeyboardInterrupt: 

In [5]:
ds_pdf

In [6]:
zarr = os.path.join(zarr_dir, f"pdf_acc_bin.zarr")
ds_pdf.to_zarr(zarr, mode="w")  
print(f"pdf storred in {zarr}")

pdf storred in /home1/datawork/mdemol/m2/pdf_acc_bin.zarr


In [7]:
ds_pdf_all = diag.global_pdf(ds_pdf).persist()#normalized

list_sentinel = [l for l in ds_pdf.drifter_sat_year.values if 'SASSA_Sentinel' in l]
list_saral = [l for l in ds_pdf.drifter_sat_year.values if 'SARAL' in l]
ds_pdf_sentinel= diag.global_pdf(ds_pdf,drifter_sat_year = list_sentinel).persist()
ds_pdf_saral = diag.global_pdf(ds_pdf,drifter_sat_year = list_saral).persist()

list_sentinel_gps = [l for l in list_sentinel if 'gps' in l]
list_saral_gps = [l for l in list_saral if 'argos' in l]
ds_pdf_sentinel_gps= diag.global_pdf(ds_pdf,drifter_sat_year = list_sentinel_gps).persist()
ds_pdf_saral_gps = diag.global_pdf(ds_pdf,drifter_sat_year = list_saral_gps).persist()

list_argos = [l for l in ds_pdf.drifter_sat_year.values if 'argos' in l]
list_gps = [l for l in ds_pdf.drifter_sat_year.values if 'gps' in l]
ds_pdf_argos = diag.global_pdf(ds_pdf,drifter_sat_year = list_argos).persist()
ds_pdf_gps = diag.global_pdf(ds_pdf,drifter_sat_year = list_gps).persist()

In [8]:
#DS_pdf = {'all':ds_pdf_all, 'sentinel':ds_pdf_sentinel,  'saral':ds_pdf_saral, 'gps':ds_pdf_gps, 'argos':ds_pdf_argos, 'sentinel_gps':ds_pdf_sentinel_gps, 'saral_gps':ds_pdf_saral_gps,}
DS_pdf = {'sentinel_gps':ds_pdf_sentinel_gps, 'saral_gps':ds_pdf_saral_gps,}
for key in DS_pdf:
    zarr = os.path.join(zarr_dir, "pdf_acc_bin_"+key+".zarr")
    DS_pdf[key].to_zarr(zarr, mode="w")  
    print(f"pdf storred in {zarr}")

pdf storred in /home1/datawork/mdemol/m2/pdf_acc_bin_sentinel_gps.zarr
pdf storred in /home1/datawork/mdemol/m2/pdf_acc_bin_saral_gps.zarr


In [9]:
ds_pdf_all.id_comb

In [None]:
#ds_rms_all = diag.ds_mean_var_std(ds_pdf_all,'acc_bin',mean = True, rms=True, var=True, std=True)
#ds_rms_sentinel = diag.ds_mean_var_std(ds_pdf_sentinel,'acc_bin',mean = True, rms=True, var=True, std=True)
#ds_rms_saral = diag.ds_mean_var_std(ds_pdf_saral,'acc_bin',mean = True, rms=True, var=True, std=True)
#ds_rms_argos = diag.ds_mean_var_std(ds_pdf_argos,'acc_bin',mean = True, rms=True, var=True, std=True)
#ds_rms_gps = diag.ds_mean_var_std(ds_pdf_gps,'acc_bin',mean = True, rms=True, var=True, std=True)
#ds_rms_sentinel_gps = diag.ds_mean_var_std(ds_pdf_sentinel_gps,'acc_bin',mean = True, rms=True, var=True, std=True)
ds_rms_saral_gps = diag.ds_mean_var_std(ds_pdf_saral_gps,'acc_bin',mean = True, rms=True, var=True, std=True)

In [None]:
#DS_rms = {'all':ds_rms_all, 'sentinel':ds_rms_sentinel,  'saral':ds_rms_saral, 'gps':ds_rms_gps, 'argos':ds_rms_argos}
DS_rms = {'sentinel_gps':ds_rms_sentinel_gps,  'saral_gps':ds_rms_saral_gps}

In [17]:
ds_rms_sentinel

In [None]:
for key in DS_rms:
    zarr = os.path.join(zarr_dir, "stat_acc_bin_"+key+".zarr")
    DS_rms[key].to_zarr(zarr, mode="w")  
    print(f"pdf storred in {zarr}")

In [4]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
