In [1]:
import xarray as xr
import numpy as np
import dask
import dask.array as da
from dask.diagnostics import ProgressBar
import glob
import matplotlib.pyplot as plt

from linearsim import time_domain_ras, distribution_seeds

# Generate a dataset of input spectra 

In [2]:
suffix = '_percentiles_221112' # Tag to added to output filename
n_seed = 10000

In [3]:
# Number of waves to evaluate - based on peak period
duration=xr.DataArray(np.array([10,20,50,100,250,500]).astype(int),dims='n')
duration

In [4]:
def set_coords(ds):
    ds = ds.rename({'N':'n','Gamma':'gamma','Seed':'seed'})
    ds = ds.assign_coords({'n':ds['n'],'gamma':ds['gamma'],'seed':range(len(ds['seed']))})
    ds = ds.set_coords(['hs','tp','dt','fft_min_duration'])
    return ds

# Save results at an interval of 10000 seeds, will concatentate later
for checkpoint in range(100):
    break

    ds = xr.Dataset()
    ds['Gamma'] = xr.DataArray(np.array([1.0,3.3,7]),dims='gamma')
    ds['N'] = duration
    ds['fft_min_duration'] = xr.ones_like(ds['N'])*512
    ds['Seed'] = xr.DataArray(da.arange(n_seed,chunks=(n_seed//5,)).astype('int'),dims='seed') # Specifies the granularity for calculation with dask using chunks
    ds['this_seed'] = xr.DataArray(da.random.choice(int(1E9), size=(len(ds['Gamma']), len(ds['N']), n_seed),chunks=(1,1,n_seed//5)).astype('int'), dims=('gamma','n','seed'))
    ds['dt']=1/32
    ds['hs']=1.0
    ds['tp']=1.0
    ds = ds.chunk({'gamma':1,'n':1,'seed':n_seed//5})

    ds = set_coords(ds)

    # Names of output variables returned from time_domain_ras function
    output_variables = 'Tz, Tm01, Hm0, Hs, H13, H13_unbiased, Hmax, HmaxT, Cmax, CmaxT2, r_spectra, r_sample, r_unbiased, k3, k4, seed'
    outputs = xr.apply_ufunc(time_domain_ras,
                            ds['hs'],
                            ds['tp'],
                            ds['gamma'],
                            ds['n'],
                            ds['dt'],
                            ds['this_seed'],
                            ds['fft_min_duration'],
                            input_core_dims=[[],[],[],[],[],[],[]],
                            output_core_dims=[[],]*16,
                            vectorize=True,
                            dask='parallelized',
                            output_dtypes=['float',]*16
                            )
                            
    # Assign the outputs to the xr.Dataset
    for v, o in zip(output_variables.split(',')[:-1],outputs[:-1]): # ignore the returned seed - same as input
        ds[v.strip()] = o
    

    # Use multiple processes to calculate timeseries and write to disk
    with dask.config.set(scheduler='processes'):
        with ProgressBar():
            ds = ds.compute()

    ds_output = ds.chunk({'gamma':1,'n':-1,'seed':-1})
    ds_output.to_netcdf(f'data/timeseries_stats/RAS_n{n_seed}{suffix}_{checkpoint:05.0f}.nc',mode='w')

In [5]:
files = sorted(glob.glob('data/timeseries_stats/RAS_n10000_percentiles_221112_*.nc'))
files[0:5]

['data/timeseries_stats\\RAS_n10000_percentiles_221112_00000.nc',
 'data/timeseries_stats\\RAS_n10000_percentiles_221112_00001.nc',
 'data/timeseries_stats\\RAS_n10000_percentiles_221112_00002.nc',
 'data/timeseries_stats\\RAS_n10000_percentiles_221112_00003.nc',
 'data/timeseries_stats\\RAS_n10000_percentiles_221112_00004.nc']

In [6]:
len(files)

1000

In [7]:
with dask.config.set(scheduler='processes'):
    ds = xr.open_mfdataset(files[:200],
                            concat_dim='seed',
                            combine='nested',
                            coords='minimal',
                            compat='override',
                            join='override',
                            parallel=True)


In [8]:
ds = ds.chunk({"n":1,"gamma":1,"seed":-1})
ds['HmHs'] = ds.Hs/ds.Hmax

In [9]:
quantiles = np.array([0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
# d_quantiles = sorted((quantiles+0.005).tolist() + (quantiles-0.005).tolist())

In [10]:
ds_stacked = ds.stack(set=["n","gamma"])

In [11]:
from dask import delayed
d_distribution_seeds = delayed(distribution_seeds)

futures = {}
for n in ds.n.values:
    for g in ds.gamma.values:
        futures[(n,g)] = d_distribution_seeds(ds.sel(n=n,gamma=g))


In [12]:
# with dask.config.set(scheduler="processes"):
#     with ProgressBar():
#         results = dask.compute(futures)

In [13]:
from distributed import LocalCluster, Client
cluster = LocalCluster()
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 15.78 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:65470,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 15.78 GiB

0,1
Comm: tcp://127.0.0.1:65510,Total threads: 3
Dashboard: http://127.0.0.1:65511/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:65476,
Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-0mr12510,Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-0mr12510

0,1
Comm: tcp://127.0.0.1:65501,Total threads: 3
Dashboard: http://127.0.0.1:65502/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:65474,
Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-a9pjcr3m,Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-a9pjcr3m

0,1
Comm: tcp://127.0.0.1:65504,Total threads: 3
Dashboard: http://127.0.0.1:65505/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:65475,
Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-od_5qqai,Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-od_5qqai

0,1
Comm: tcp://127.0.0.1:65507,Total threads: 3
Dashboard: http://127.0.0.1:65508/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:65473,
Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-onq5v6ir,Local directory: C:\Users\bransonp\AppData\Local\Temp\dask-worker-space\worker-onq5v6ir


In [14]:
results = dask.compute(futures)

In [None]:
res_dsets = []
for (n,g),this_ds in results[0].items():
    this_ds = this_ds.expand_dims({'n':[n,],'gamma':[g,]})
    res_dsets.append(this_ds)

ds_seeds = xr.merge(res_dsets)

In [31]:
ds_seeds['univariate_seeds'] = ds_seeds['univariate_seeds'].astype('int32')
ds_seeds['multivariate_seeds'] = ds_seeds['univariate_seeds'].astype('int32')
ds_seeds['univariate_values'] = ds_seeds['univariate_values'].astype('float32')
ds_seeds['multivariate_values'] = ds_seeds['multivariate_values'].astype('float32')

In [32]:
ds_seeds.to_netcdf('data/seeds/quantile_seeds_221112.nc')