In [1]:
import time
import os
from glob import glob
from datetime import datetime
from pebble import ProcessPool, ProcessExpired
import configparser
import traceback
import warnings
from importlib import reload

from tqdm import tqdm

from hailcore import lib_dp as dp
from hailcore import lib_grid as grid
from hailcore import lib_nwp as nwp
from hailcore import lib_optflow as optflow
from hailcore import lib_qc as qc
from hailcore import lib_file as file
from hailcore import lib_conf as conf_generator
from hailcore import lib_util as util
from hailcore import lib_geometry as geometry
from hailcore import lib_layers

from hailcore import retrieval_volume
from hailcore import retrieval_grid
from hailcore import retrieval_optflow
from hailcore import retrieval_accumulation
from hailcore import retrieval

warnings.simplefilter("ignore")




## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119





In [2]:
"""
PERILS events

3 19990414

71,3,4 20181220

23 20200419

2,40,3,4,66 20200119 20200121 

66,8,28 20201031
"""

""" SPLASH events
66 20180926

"""

' SPLASH events\n66 20180926\n\n'

In [5]:
#config path

# queue = [([3], '19990414', '19990414'),
#         ([71,3,4], '20181220', '20181220'),
#         ([23],'20200419','20200419'),
#         ([2,40,3,4,66],'20200119','20200121'),
#         ([66,8,28],'20201031','20201031'),
#         ([66],'20180926','20180926')]

# queue = [([64],'20161111','20161111'),
#         ([66],'20191117','20191117')]
queue = [([66],'20191117','20191117')]


#(radar_id_list, start_date_str, end_date_str)
#queue = [([66],'20180926','20180926')]


### call workers

In [6]:
#TODO: run profiler on DP data processing - 30seconds is too long.


#Batch Mode
conf = conf_generator.HailCore(mode='nci')

def multiproc(item_list, conf, function_name):
    #build arg list
    arg_list = []
    for item in item_list:
        arg_list.append((item, conf))
    n_chunks = int(len(arg_list)/conf.ncpu)
    output = []
    for arg_slice in tqdm(util.chunks(arg_list, conf.ncpu), total=n_chunks):
        with ProcessPool() as pool:
            future = pool.map(eval(function_name), arg_slice, timeout=conf.timeout)
            iterator = future.result()
            while True:
                try:
                    output.append(next(iterator))
                except StopIteration:
                    break
                except TimeoutError as error:
                    print("function took longer than %d seconds" % error.args[1])
                except ProcessExpired as error:
                    print("%s. Exit code: %d" % (error, error.exitcode))
                except TypeError as error:
                    print("%s. Exit code: %d" % (error, error.exitcode))
                except Exception:
                    traceback.print_exc()
    return output

def _multiproc_grid(args):
    #unpack args
    vol_ffn, conf = args
    radar = retrieval_volume.worker(vol_ffn, conf)
    grid_ffn = retrieval_grid.worker(radar, conf)
    return grid_ffn
    
def _multiproc_opt(args):
    #unpack args
    grid_ffn, conf = args
    opt_dict = retrieval_optflow.worker(grid_ffn, conf)
    return opt_dict

def _multiproc_acc(args):
    #unpack args
    opt_dict, conf = args
    retrieval_accumulation.worker(opt_dict, conf)
    return None
        
#extract level1 vol files
for item in queue:
    radar_id_list = item[0]
    start_dt = datetime.strptime(item[1], '%Y%m%d')
    end_dt = datetime.strptime(item[2], '%Y%m%d')
    dt_list = util.daterange(start_dt, end_dt)
    for radar_id in radar_id_list:
        for dt in dt_list:

            #unpack volume into temp folder
            zip_ffn = f'{conf.odimh5_root}/{radar_id}/{dt.year}/vol/{radar_id}_{dt.strftime("%Y%m%d")}.pvol.zip'
            temp_path = file.unpack_zip(zip_ffn)
            vol_ffn_list = sorted(glob(temp_path + '/*.h5'))
            
            #setup multiprocessing here
            if conf.ncpu == 1:
                #single process
                for vol_ffn in tqdm(vol_ffn_list, total=len(vol_ffn_list)):
                    retrieval.manager(vol_ffn, conf)

            else:
                #multiprocessing grids
                print('starting grid multiprocessing for:', radar_id, dt)
                grid_ffn_list = multiproc(vol_ffn_list, conf, '_multiproc_grid')
                
                #multiprocessing opticalflow (must be separate to ensure grids have finished writting)
                print('starting opticalflow multiprocessing for:', radar_id, dt)
                opt_dict_list = multiproc(grid_ffn_list, conf, '_multiproc_opt')
                
                #multiprocessing 5minacc (must be separate to ensure opticalflow have finished writting)
                print('starting acc multiprocessing for:', radar_id, dt)
                _ = multiproc(opt_dict_list, conf, '_multiproc_acc')               

            #clean up
            os.system(f'rm -rf {temp_path}')


starting grid multiprocessing for: 66 2019-11-17 00:00:00


100%|██████████| 30/30 [17:06<00:00, 34.22s/it]


starting opticalflow multiprocessing for: 66 2019-11-17 00:00:00


100%|██████████| 30/30 [01:38<00:00,  3.30s/it]


starting acc multiprocessing for: 66 2019-11-17 00:00:00


  0%|          | 0/30 [00:00<?, ?it/s]

None opt_dict, aborting
cannot find valid t0 grid for t1: 2019-11-17 00:01:00


100%|██████████| 30/30 [02:02<00:00,  4.09s/it]
