In [1]:
import pandas as pd
import os

from pygridder import pygridder as pgrid
import pyproj
import pathlib

import multiprocessing.popen_spawn_posix
import dask
from dask.distributed import Client, progress

import skimage.morphology as skmorph
import datetime as dt
import numpy as np
import scipy.ndimage as ndimage

from glob import glob

In [2]:
from scipy import stats

In [3]:
import gzip
import pygrib as pg

In [4]:
import dclasses as dc

In [71]:
client = Client(n_workers=2,threads_per_worker=1)

In [103]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


In [17]:
# Global Variables & Pre-Processing for PP forecast
dx = 5 # delta x
selem = skmorph.disk(40 / dx) # morphology disk

ndfd_path = pathlib.Path('../scripts/pas-input-data/ndfd.npz').resolve()
with np.load(ndfd_path) as NPZ:
    lons = NPZ['lons']
    lats = NPZ['lats']
    
G = pgrid.Gridder(tx=lons, ty=lats, dx=dx/100)

dateparser = lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') + dt.timedelta(hours=6)
data_path = pathlib.Path('../../../gen-assets/1950-2019_tors_CONUS.csv')

df = pd.read_csv(data_path, parse_dates=[['date','time']], date_parser=dateparser, index_col=0, keep_date_col=True)
df = df.reset_index()

all_di = {0: 0.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 5: 1.0}
sig_di = {0: 0.0, 1: 0.0, 2: 1.0, 3: 1.0, 4: 1.0, 5: 1.0}
df['all_weight'] = df['mag'].map(all_di).fillna(df['mag'])
df['sig_weight'] = df['mag'].map(sig_di).fillna(df['mag'])

outlook_time = '1200'

# Create date array
my_hour = dt.datetime.strptime(outlook_time, '%H%M').hour
my_minute = dt.datetime.strptime(outlook_time, '%H%M').minute

bdt = dt.datetime(1950,1,1,my_hour,my_minute)
edt = dt.datetime(2019,12,31,my_hour,my_minute)

# Create list index of datetimes with a frequency of one per day
dts = pd.date_range(bdt, edt, freq='D')
bdts, edts = dts[:-1],dts[1:]

In [18]:
# PAS Global Variables and Pre-Processing
### Parse CLI Arguments ###

ndfd_area = 25
nsims = 10000
tornado_direction_distribution = stats.norm(50, 15)
coolseason = [1, 2, 3, 4, 11, 12]

impacts_data_root = pathlib.Path('../scripts/pas-input-data/')
outdir = pathlib.Path('./PAS-climo/',"output").resolve()
outdir.mkdir(exist_ok=True)

In [None]:
%%time
## Baseline loop

for bdt, edt in zip(bdts[:100],edts[:100]):
    _df = df[(df['date_time'] >= bdt) & (df['date_time'] < edt) & (df['all_weight'] == 1.0)]
    if _df.empty:
        continue
    else:
        processPP(_df,bdt)

- loop through one tor
- add days to a das delayed if they include tornadoes
- once delayed arr is of length 12, run a compute...
    - Create tor and sigtor prob arrays
    - run PAS on these prob arrays and save psv.gz files

### Current Implementation of Dask to parallelize via grouping PP creation in chunks

Code to find missing dates and process them

In [101]:
bdt = dt.datetime(2019,1,1,my_hour,my_minute)
edt = dt.datetime(2019,12,31,my_hour,my_minute)

# Create list index of datetimes with a frequency of one per day
dts = pd.date_range(bdt, edt, freq='D')
bdts, edts = dts[:-1],dts[1:]

In [102]:
da_list = []

for bdt, edt in zip(bdts[:],edts[:]):
    
    file_path = './PAS-climo/output/'
    
    to_down = f"{file_path}{bdt.strftime('%Y%m%d%H%M%S')}_ts.psv.gz"
    
    _df = df[(df['date_time'] >= bdt) & (df['date_time'] < edt) & (df['all_weight'] == 1.0)]
    if _df.empty:
        continue
    else:
        if not bool(glob(to_down)):
            
            da = dask.delayed(processPP)(_df,bdt)
            da_list.append(da)
        
            if len(da_list) == 2:
               dask.compute(da_list)

               da_list = []

Code to reset the beginning date when dask processing breaks

In [75]:
bdt = dt.datetime(2011,4,27,my_hour,my_minute)

In [77]:
stuck = bdt
edt = dt.datetime(2019,12,31,my_hour,my_minute)

# Create list index of datetimes with a frequency of one per day
dts = pd.date_range(stuck, edt, freq='D')
bdts, edts = dts[:-1],dts[1:]

<hr>

In [78]:
%%time
# Loop through times and use dask to run in parallel chunks of days at a time

da_list = []

for bdt, edt in zip(bdts[:],edts[:]):
    _df = df[(df['date_time'] >= bdt) & (df['date_time'] < edt) & (df['all_weight'] == 1.0)]
    if _df.empty:
        continue
    else:
        da = dask.delayed(processPP)(_df,bdt)
        da_list.append(da)
        
        if len(da_list) == 4:
            das = dask.compute(da_list)
            
            da_list = []
    

CPU times: user 31min 40s, sys: 5min 1s, total: 36min 41s
Wall time: 1h 50min 27s


In [9]:
def runPAS(all_fcst, sig_fcst, date_in_name):
    
    outfile = outdir.joinpath(f"{date_in_name.strftime('%Y%m%d%H%M%S')}_ts.psv.gz")
    
    torn = all_fcst*100
    continuous_torn = dc.make_continuous(torn)
    sigtorn = sig_fcst*100
    
    sigtorn[sigtorn > 0] = 1
    if (torn.max() >= 30) and (sigtorn.max() > 0):
        sigtorn[torn >= 15] += 1
    sigtorn_1d = sigtorn.ravel()
    usesig = True if (date_in_name.month in coolseason) or (sigtorn.max() > 0) else False
    
    tornado_dists = dc.TornadoDistributions()
    counts = np.zeros((5, nsims), dtype=int)
    counts[0, :] = (tornado_dists.f02.rvs(nsims) * ndfd_area * (torn == 2).sum()).astype(int)
    counts[1, :] = (tornado_dists.f05.rvs(nsims) * ndfd_area * (torn == 5).sum()).astype(int)
    counts[2, :] = (tornado_dists.f10.rvs(nsims) * ndfd_area * (torn == 10).sum()).astype(int)
    counts[3, :] = (tornado_dists.f15.rvs(nsims) * ndfd_area * (torn == 15).sum()).astype(int)
    counts[4, :] = (tornado_dists.f30.rvs(nsims) * ndfd_area * (torn >= 30).sum()).astype(int)
    
    ### Setup Impact Simulation ###
    igrids = dc.ImpactGrids(impacts_data_root)
    
    scounts = counts.sum(axis=1)
    inds02 = dc.weighted_choice(prob=2, probs=torn, cprobs=continuous_torn, size=scounts[0])
    inds05 = dc.weighted_choice(prob=5, probs=torn, cprobs=continuous_torn, size=scounts[1])
    inds10 = dc.weighted_choice(prob=10, probs=torn, cprobs=continuous_torn, size=scounts[2])
    inds15 = dc.weighted_choice(prob=15, probs=torn, cprobs=continuous_torn, size=scounts[3])
    inds30 = dc.weighted_choice(prob=30, probs=torn, cprobs=continuous_torn, size=scounts[4])
    inds = dc.flatten_list([inds02, inds05, inds10, inds15, inds30])
    
    non_sig_inds = sigtorn_1d[inds] == 0
    single_sig_inds = sigtorn_1d[inds] == 1
    double_sig_inds = sigtorn_1d[inds] == 2
    
    if usesig:
        single_sig_inds += non_sig_inds
        non_sig_inds[:] = False
        
    # Handle Locations
    non_sig_loc_inds = inds[non_sig_inds]
    single_sig_loc_inds = inds[single_sig_inds]
    double_sig_loc_inds = inds[double_sig_inds]
    
    # Handle Ratings
    _mags=[0, 1, 2, 3, 4, 5]
    non_sig_ratings = np.random.choice(_mags, size=non_sig_inds.sum(),
                                        replace=True, p=tornado_dists.r_nonsig)
    single_sig_ratings = np.random.choice(_mags, size=single_sig_inds.sum(),
                                            replace=True, p=tornado_dists.r_singlesig)
    double_sig_ratings = np.random.choice(_mags, size=double_sig_inds.sum(),
                                                replace=True, p=tornado_dists.r_doublesig)
    
    # Handle Distances
    non_sig_distances = dc.get_distances(non_sig_ratings, tornado_dists)
    single_sig_distances = dc.get_distances(single_sig_ratings, tornado_dists)
    double_sig_distances = dc.get_distances(double_sig_ratings, tornado_dists)
    
    #print("Running simulations...")
    #print("    Non Sig...")
    non_sig = dc.simulate(non_sig_loc_inds, non_sig_distances,
                            non_sig_ratings, tornado_direction_distribution, igrids)
    #print("    Single Sig...")
    single_sig = dc.simulate(single_sig_loc_inds, single_sig_distances,
                                single_sig_ratings, tornado_direction_distribution, igrids)
    #print("    Double Sig...")
    double_sig = dc.simulate(double_sig_loc_inds, double_sig_distances,
                                double_sig_ratings, tornado_direction_distribution, igrids)
    
    simulated_tornadoes = dc.flatten_list([non_sig, single_sig, double_sig])
    np.random.shuffle(simulated_tornadoes)
    _sims = np.split(simulated_tornadoes, counts.sum(axis=0).cumsum())[:-1]
    realizations = dc.Realizations([dc.SyntheticTornadoRealization(_sim, i+1) for i, _sim in enumerate(_sims)])
    
    with gzip.GzipFile(outfile, "w") as OUT:
        OUT.write(realizations.as_psv.encode())

In [10]:
def processPP(_df,date_in_name):
    
    lon1 = _df.slon.values
    lat1 = _df.slat.values
    lon2 = _df.elon.values
    lat2 = _df.elat.values

    # Find/remove/replace missing data
    keep = ~np.logical_or(lon1 == 0, lat1 == 0)
    lon1 = lon1[keep]
    lat1 = lat1[keep]
    lon2 = lon2[keep]
    lat2 = lat2[keep]
    lon2[lon2 == 0] = lon1[lon2 == 0]
    lat2[lat2 == 0] = lat1[lat2 == 0]

    # Grid tornadoes
    tornlines = G.grid_lines(sxs=lon1, sys=lat1, exs=lon2, eys=lat2)
    all_mags = _df['all_weight']
    sig_mags = _df['sig_weight']
    all_fcst = G.make_empty_grid(dtype='float')
    sig_fcst = G.make_empty_grid(dtype='float')
    
    for tornline, all_mag, sig_mag in zip(tornlines, all_mags, sig_mags):
        all_fcst[tornline] = all_mag
        sig_fcst[tornline] = sig_mag
    
    # Make practically perfect forecast
    all_fcst = skmorph.binary_dilation(all_fcst, selem).astype(float)
    sig_fcst = skmorph.binary_dilation(sig_fcst, selem).astype(float)

    #print(np.max(fcst))
    all_fcst = ndimage.gaussian_filter(all_fcst, 120/dx)
    sig_fcst = ndimage.gaussian_filter(sig_fcst, 120/dx)
    
    # Degrade continuous probs into SPC prob
    # Uncomment if regular probs
    all_fcst[all_fcst < 0.02] = 0
    all_fcst[np.logical_and(all_fcst < 0.05, all_fcst >= 0.02)] = 0.02
    all_fcst[np.logical_and(all_fcst < 0.10, all_fcst >= 0.05)] = 0.05
    all_fcst[np.logical_and(all_fcst < 0.15, all_fcst >= 0.10)] = 0.10
    all_fcst[np.logical_and(all_fcst < 0.30, all_fcst >= 0.15)] = 0.15
    all_fcst[np.logical_and(all_fcst < 0.45, all_fcst >= 0.30)] = 0.30
    all_fcst[all_fcst >= 0.45] = 0.45

    # Uncomment if sigtor probs
    sig_fcst[sig_fcst < 0.10] = 0
    sig_fcst[sig_fcst >= 0.10] = 0.10
    
    # Have this function call PAS function here
    runPAS(all_fcst, sig_fcst, date_in_name)
    
    #return all_fcst, sig_fcst

### Trying different ways to chunk data for dask parallelizing

In [None]:
%%time

finished = dask.compute(anom(bdts,edts))

In [None]:
def anom(bdts,edts):
    
    results = []
    
    for bdt, edt in zip(bdts[:100],edts[:100]):
        _df = df[(df['date_time'] >= bdt) & (df['date_time'] < edt) & (df['all_weight'] == 1.0)]
        if _df.empty:
            continue
        else:
            result = processPP_da(_df,bdt)
            
        results.append(result)
            
    return results

In [None]:
@dask.delayed
def processPP_da(_df,date_in_name):
    
    dtime = date_in_name.strftime('%Y%m%d%H%M%S')
    
    lon1 = _df.slon.values
    lat1 = _df.slat.values
    lon2 = _df.elon.values
    lat2 = _df.elat.values

    # Find/remove/replace missing data
    keep = ~np.logical_or(lon1 == 0, lat1 == 0)
    lon1 = lon1[keep]
    lat1 = lat1[keep]
    lon2 = lon2[keep]
    lat2 = lat2[keep]
    lon2[lon2 == 0] = lon1[lon2 == 0]
    lat2[lat2 == 0] = lat1[lat2 == 0]

    # Grid tornadoes
    tornlines = G.grid_lines(sxs=lon1, sys=lat1, exs=lon2, eys=lat2)
    all_mags = _df['all_weight']
    sig_mags = _df['sig_weight']
    all_fcst = G.make_empty_grid(dtype='float')
    sig_fcst = G.make_empty_grid(dtype='float')
    
    for tornline, all_mag, sig_mag in zip(tornlines, all_mags, sig_mags):
        all_fcst[tornline] = all_mag
        sig_fcst[tornline] = sig_mag
    
    # Make practically perfect forecast
    all_fcst = skmorph.binary_dilation(all_fcst, selem).astype(float)
    sig_fcst = skmorph.binary_dilation(sig_fcst, selem).astype(float)

    #print(np.max(fcst))
    all_fcst = ndimage.gaussian_filter(all_fcst, 120/dx)
    sig_fcst = ndimage.gaussian_filter(sig_fcst, 120/dx)
    
    # Degrade continuous probs into SPC prob
    # Uncomment if regular probs
    all_fcst[all_fcst < 0.02] = 0
    all_fcst[np.logical_and(all_fcst < 0.05, all_fcst >= 0.02)] = 0.02
    all_fcst[np.logical_and(all_fcst < 0.10, all_fcst >= 0.05)] = 0.05
    all_fcst[np.logical_and(all_fcst < 0.15, all_fcst >= 0.10)] = 0.10
    all_fcst[np.logical_and(all_fcst < 0.30, all_fcst >= 0.15)] = 0.15
    all_fcst[np.logical_and(all_fcst < 0.45, all_fcst >= 0.30)] = 0.30
    all_fcst[all_fcst >= 0.45] = 0.45

    # Uncomment if sigtor probs
    sig_fcst[sig_fcst < 0.10] = 0
    sig_fcst[sig_fcst >= 0.10] = 0.10
    
    # Have this function call PAS function here
    
    return [all_fcst, sig_fcst]