Dask Approach to Non-SQL PTM Queries
--

Animate the overall field

First go with lightly smoothed, subtidal?

In [1]:
import postproc_dask as post



In [2]:
try:
    client.close()
except NameError:
    pass

In [3]:
import multiprocessing.popen_spawn_posix #  https://github.com/dask/distributed/issues/4168
import dask
import dask.dataframe as dd
import dask.bag as db
import numpy as np

In [4]:
from dask.distributed import Client
client=Client(n_workers=16,threads_per_worker=1)
client.cluster

VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [5]:
# Configure malloc on each worker, too
import os
def client_config(_):
    p=os.getpid()
    post.config_malloc()
    return p

pids=[f.result() for f in client.map(lambda x: os.getpid(), client.ncores(),pure=False)]
assert len(pids)==len(np.unique(pids))


In [6]:
import matplotlib.pyplot as plt
import conc_figure
import six
import stompy.plot.cmap as scmap
from stompy.spatial import proj_utils
from matplotlib import cm
cmap=cm.CMRmap_r
cmap=scmap.cmap_clip(cmap,0.03,1.0)

%matplotlib notebook

In [7]:
import os
import glob
import numpy as np
import pandas as pd
import xarray as xr
import re

from stompy.grid import unstructured_grid
from stompy import utils, memoize
from stompy.model.fish_ptm import ptm_config, ptm_tools
from stompy.model.suntans import sun_driver
from scipy.stats import spearmanr

import stompy.plot.cmap as scmap
from scipy import stats
import seaborn as sns
turbo=scmap.load_gradient('turbo.cpt')

Overall Process
===

1. SUNTANS hydro runs
2. SUNTANS average output
3. ptm-formatted average output
4. PTM runs
5. Load data

The top-level query is something like *generate a map of concentrations for...*

filter on:
 - sources $x$
 - settling classes $y$
 - vertical positions $z$
 - horizontal positions $h$

weighted by

 - loading data 
 - age
 
mapped by one of ...

 - bounding box
 - put on hydro grid
 - put on regular grid

and possibly smoothed.

In [17]:
# Experiment level configuration -- small, all python native data.
# The 'new' run
cfg=dict(
    ptm_base_dir="/opt2/sfb_ocean/ptm/all_source_022b",
    sun_base_dir="/opt2/sfb_ocean/suntans/runs",
    ptm_output_interval=np.timedelta64(1,'h')
)
cfg['ptm_run_patt']=os.path.join(cfg['ptm_base_dir'],"chunk??","20??????")
cfg['sun_patt']=os.path.join(cfg['sun_base_dir'],"merged_022_20??????")

ptm_run_paths=glob.glob(cfg['ptm_run_patt'])
ptm_run_paths.sort()
cfg['ptm_run_paths']=ptm_run_paths

sun_paths=glob.glob(cfg['sun_patt'])
sun_paths.sort()
cfg['sun_paths']=sun_paths

In [18]:
# Load the grid into... grid
hydro_path=sun_paths[0]
ptm_ds=xr.open_dataset(os.path.join(hydro_path,"ptm_average.nc_0000.nc"))
grid=unstructured_grid.UnstructuredGrid.read_ugrid(ptm_ds,dialect='fishptm')
ptm_ds.close()   

# distribute to workers ahead of time.
grid_d=client.scatter(grid)
cfg['grid_d']=grid_d # too far?

In [19]:
# So far this is only used locally.  Slow to compute (15s)
Msmooth=grid.smooth_matrix()

INFO:utils:59033/99089


In [20]:
# Or could make this delayed and have it execute on each client?
load_data_d=client.scatter(post.get_load_data())
cfg['load_data_d']=load_data_d


In [21]:
#def total_area(g):
#    return g.cells_area().sum()

areas=grid.cells_area()
# total_area_d=dask.delayed(total_area)(grid_d)
# cfg['total_area']=areas.sum()

In [22]:
# Hydro timestamps
cfg['hydro_timestamps']=post.load_hydro_timestamps(sun_paths)


In [23]:
bc_ds_d=client.scatter(post.bc_ds(cfg=cfg))
cfg['bc_ds_d']=bc_ds_d

Generate summary datasets
--

1. Hourly snapshots of surface concentration with a nominal storm_factor and
   tau_d

2. Summary of wet season, variation in age.

Hourly surface concentrations
--

In [24]:
out_dir="hourly-out-v01"
# v01: fix error in surface filter

if not os.path.exists(out_dir):
    os.makedirs(out_dir)

storm_factor=0.02
tau_d=30

# The hour/minutes are important to cast correctly.
intervals=np.arange(np.datetime64("2017-08-01T00:00"),
                    np.datetime64("2018-07-04T00:00"),
                    np.timedelta64(1,'D'))

for t_min,t_max in zip(intervals[:-1],intervals[1:]):
    hours=np.arange(t_min,t_max,np.timedelta64(3600,'s'))
    targets=[]
    for h_min,h_max in zip(hours[:-1],hours[1:]):
        dt=utils.to_datetime(h_min)
        date_s=dt.strftime("%Y%m%dT%H%M")
        conc_fn=os.path.join(out_dir,f"surface-{date_s}.nc")
        if not os.path.exists(conc_fn):
            targets.append([h_min,h_max,conc_fn])
    if not targets:
        print(f"Skip {t_min} - {t_max}")
        continue # nothing to do
        
    print(f"Processing {t_min} - {t_max}")
        
    criteria=dict(t_min=t_min,t_max=t_max,
                  category='nonfiber',
                  z_below_surface_max=0.095,
                  age_max=np.timedelta64(60,'D'))
    part_d=post.query_particles(criteria,cfg=cfg)
    df=part_d.compute()
    print("Dicing...")

    # Weights computed on the whole group.
    age=df['time'] - df['rel_time']
    tau=np.timedelta64(tau_d,'D')
    decay=np.exp( -age/tau ) 

    group_weight=post.group_weights(df,storm_factor)

    df['count']=group_weight* decay * df['mp_per_particle']

    for h_min,h_max,conc_fn in targets:
        sel=(df['time']>=h_min).values & (df['time']<h_max).values
        df_sel=df[sel]
        ds_conc=post.particles_to_conc(df_sel,grid,'count')
        ds_conc['time']=(),h_min
        ds_conc['time_max']=(),h_max
        ds_conc['storm_factor']=(),storm_factor
        ds_conc['tau_d']=(),tau_d
        ds_conc.to_netcdf(conc_fn)        

Skip 2017-08-01T00:00 - 2017-08-02T00:00
Skip 2017-08-02T00:00 - 2017-08-03T00:00
Skip 2017-08-03T00:00 - 2017-08-04T00:00
Skip 2017-08-04T00:00 - 2017-08-05T00:00
Skip 2017-08-05T00:00 - 2017-08-06T00:00
Skip 2017-08-06T00:00 - 2017-08-07T00:00
Skip 2017-08-07T00:00 - 2017-08-08T00:00
Skip 2017-08-08T00:00 - 2017-08-09T00:00
Skip 2017-08-09T00:00 - 2017-08-10T00:00
Skip 2017-08-10T00:00 - 2017-08-11T00:00
Skip 2017-08-11T00:00 - 2017-08-12T00:00
Skip 2017-08-12T00:00 - 2017-08-13T00:00
Skip 2017-08-13T00:00 - 2017-08-14T00:00
Skip 2017-08-14T00:00 - 2017-08-15T00:00
Skip 2017-08-15T00:00 - 2017-08-16T00:00
Skip 2017-08-16T00:00 - 2017-08-17T00:00
Skip 2017-08-17T00:00 - 2017-08-18T00:00
Skip 2017-08-18T00:00 - 2017-08-19T00:00
Skip 2017-08-19T00:00 - 2017-08-20T00:00
Skip 2017-08-20T00:00 - 2017-08-21T00:00
Skip 2017-08-21T00:00 - 2017-08-22T00:00
Skip 2017-08-22T00:00 - 2017-08-23T00:00
Skip 2017-08-23T00:00 - 2017-08-24T00:00
Skip 2017-08-24T00:00 - 2017-08-25T00:00
Skip 2017-08-25T

Will repartition with 66 partitions
Dicing...
Processing 2018-04-26T00:00 - 2018-04-27T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-04-27T00:00 - 2018-04-28T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-04-28T00:00 - 2018-04-29T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-04-29T00:00 - 2018-04-30T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-04-30T00:00 - 2018-05-01T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-05-01T00:00 - 2018-05-02T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-05-02T00:00 - 2018-05-03T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-05-03T00:00 - 2018-05-04T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-05-04T00:00 - 2018-05-05T00:00
Will repartition with 57 partitions
Dicing...
Processing 2018-05-05T00:00 - 2018-05-06T00:00
Will repartition with 66 partitions
Dicing...
Processing 2018-05-06T00

In [15]:

def sum_to_grid(df,ages=[2,5,15,30,60]):
    """
    Aggregate along the way to cell concentrations
    """
    # Weights computed on the whole group.
    # Create a dataset with a concentration field for each 
    # age bracket
    group_weight=post.group_weights(df,storm_factor)
    age=df['time'] - df['rel_time']

    outs=[] # DataFrames of output
    
    for age_max_d in ages:
        print("Dicing...")
        age_max=np.timedelta64(age_max_d,'D')
        decay=age<age_max
        df['count']=group_weight* decay * df['mp_per_particle']

        ds_conc=post.particles_to_conc(df,grid,'count')
        cells=np.arange(ds_conc.dims['cell'])
        out=pd.DataFrame({'cell':cells,'conc':ds_conc.conc.values})
        out['age_max']=age_max_d
        outs.append(out)
    out=pd.concat(outs)
    # Might as well summarize at this level:
    return out.groupby(['age_max','cell'])['conc'].sum().reset_index()
sum_to_grid.meta={'age_max':np.int64,
                  'cell':np.float64,
                  'conc':np.int64}

# -----

out_dir="snapshots-v00"
if not os.path.exists(out_dir):
    os.makedirs(out_dir)

storm_factor=0.02
tau_d=None

seasons=dict(tst=dict(t_min=np.datetime64('2018-03-30 00:00'),
                      t_max=np.datetime64('2018-04-01 00:00')),
             wet=dict(t_min=np.datetime64('2018-03-30 00:00'),
                      t_max=np.datetime64('2018-04-14 00:00')),
             dry=dict(t_min=np.datetime64('2017-08-30 00:00'),
                      t_max=np.datetime64('2017-09-14 00:00')) 
            )
compartments=dict(surf=dict(z_below_surface_max=0.095),
                  bed=dict(z_above_bed_max=0.095))

defaults=dict(category='nonfiber',
              age_max=np.timedelta64(60,'D'))

# Seems to hit memory issues when processed all in one go.
for season in seasons:
    for compartment in compartments: 
        conc_fn=os.path.join(out_dir,f'conc-{season}-{compartment}.nc')
        print("Working on ",conc_fn)
        
        criteria=dict(defaults)
        criteria.update(seasons[season])
        criteria.update(compartments[compartment])
        part_d=post.query_particles(criteria,cfg=cfg)
        
        # Within partitions map to cell concentrations
        conc_by_age_d=part_d.map_partitions(sum_to_grid,meta=sum_to_grid.meta)
        
        # Then aggregate again at the top level
        conc_sum_d=conc_by_age_d.groupby(['age_max','cell'])['conc'].sum()
        conc_sum=conc_sum_d.compute()
        
        ds=xr.Dataset.from_dataframe(conc_sum.to_frame())
        Nsteps=(criteria['t_max']-criteria['t_min'])/np.timedelta64(3600,'s')
        ds['conc']=ds.conc / Nsteps
        ds['t_min']=(),criteria['t_min']
        ds['t_max']=(),criteria['t_max']
        ds['season']=(),season
        ds['compartment']=(),compartment
        ds['category']=(),criteria['category']
        ds['storm_factor']=(),storm_factor
    
        if os.path.exists(conc_fn):
            os.unlink(conc_fn)
        ds.to_netcdf(conc_fn)

Working on  snapshots-v00/conc-tst-surf.nc
Will repartition with 115 partitions
Working on  snapshots-v00/conc-tst-bed.nc
Will repartition with 115 partitions
Working on  snapshots-v00/conc-wet-surf.nc
Will repartition with 995 partitions
Working on  snapshots-v00/conc-wet-bed.nc
Will repartition with 995 partitions
Working on  snapshots-v00/conc-dry-surf.nc
Will repartition with 1028 partitions
Working on  snapshots-v00/conc-dry-bed.nc
Will repartition with 1028 partitions


Sanity check on particle concentrations
--

Some plots seem to show a lot more floating particles than sinking. Probably
just stuck particles, but odd...

In [26]:
# Choose a few days in April
criteria=dict(t_min=np.datetime64("2018-04-01 00:00"),
              t_max=np.datetime64("2018-04-02 00:00"),
              age_max=np.timedelta64(20,'D'),
              category='nonfiber')
part_d=post.query_particles(criteria=criteria,cfg=cfg)
parts=part_d.compute()

Will repartition with 23 partitions


In [35]:
# len(parts) # 40M
groups=parts['group'].unique()

source_map={}
behave_map={}

for grp in groups:
    source,behavior,release_date=post.parse_group_path(grp)
    source_map[grp]=source
    behave_map[grp]=behavior
parts['source']=parts['group'].map(source_map)
parts['behavior']=parts['group'].map(behave_map)

In [38]:
# This seems very wrong!!!!!!
by_behavior=parts.groupby('behavior')['mp_per_particle'].sum().sort_values()
by_behavior

behavior
none         7.941669e+09
down500      1.395158e+11
up500        1.395158e+11
down50000    4.284291e+11
up50000      4.284291e+11
down5000     5.116528e+12
up5000       5.116528e+12
Name: mp_per_particle, dtype: float64

In [41]:
by_behavior.values[1]-by_behavior.values[2]

0.0

This seems very wrong.

The up and down mp_per_particle values are symmetric.

Are they exactly symmetric? Or just close?  **Exactly.**



In [45]:
# postproc_dask.py:
# line 470: mp_per_particle=mp_per_liter * 1e3 * m3_per_particle
# so mp_per_liter is symmetric, too.
# HERE: trace down what's up in post.load_conc
# Yes, w_s_map was missing signs!!!!

alameda=parts[ parts['source']=='Alameda_Creek']

In [46]:
alameda.groupby(['source','behavior'])[ ['mp_per_liter','m3_per_particle','mp_per_particle'] ].mean()

Unnamed: 0_level_0,Unnamed: 1_level_0,mp_per_liter,m3_per_particle,mp_per_particle
source,behavior,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Alameda_Creek,down500,0.027445,707.909353,19428.382976
Alameda_Creek,down5000,1.006118,707.909353,712240.288134
Alameda_Creek,down50000,0.084204,707.909353,59608.658888
Alameda_Creek,none,0.001504,707.909353,1064.454678
Alameda_Creek,up500,0.027445,707.909353,19428.382976
Alameda_Creek,up5000,1.006118,707.909353,712240.288134
Alameda_Creek,up50000,0.084204,707.909353,59608.658888


In [51]:
load_data=cfg['load_data_d'].result()

In [52]:

load_data.w_s