In [7]:
import os
import glob
import pyart
import numpy as np
from pyhail import hsda, hdr, mesh, common
from cpol_processing import processing as cpol_prc
from datetime import datetime
from multiprocessing import Pool

import warnings
warnings.filterwarnings('ignore')


In [8]:
#paths
vol_path         = '/g/data/kl02/jss548/hail-research/radar_data'
out_path         = '/g/data/kl02/jss548/hail-research/processed_data'
sonde_path       = '/g/data/kl02/jss548/hail-research/snding_data'
srtm_ffn         = '/g/data1a/kl02/jss548/hail-research/srtm/srtm_67_18_Brisbane/srtm_67_18.tif'


target_data      = [['66_20171010', 'YBBN_20171010_00.nc'],
                    ['66_20171026', 'YBBN_20171026_00.nc'],
                    ['66_20171029', 'YBBN_20171029_00.nc'],
                    ['66_20171030', 'YBBN_20171030_00.nc'],
                    ['66_20171107', 'YBBN_20171107_00.nc'],
                    ['66_20171209', 'YBBN_20171209_00.nc'],
                    ['66_20171231', 'YBBN_20171231_00.nc'],
                    ['66_20180101', 'YBBN_20180101_00.nc'],
                    ['66_20180103', 'YBBN_20180103_00.nc']]
#                     ['66_20170922', 'YBBN_20170922_00.nc'],
#                     ['CP2_20141127', 'YBBN_20141127_00.nc'],
#                     ['CP2_20081116', 'YBBN_20081116_00.nc']]

#field names (used to map to radar object fields)
fieldn      = {'dbzh':'DBZH',
               'dbzh_corr':'DBZH_CORR',
               'zdr':'ZDR',
               'zdr_corr':'ZDR_CORR',
               'phi':'PHIDP',
               'phi_unfold':'PHI_UNF',
               'phi_bringi':'PHIDP_BRINGI',
               'kdp':'KDP',
               'kdp_bringi':'KDP_BRINGI',
               'rhv':'RHOHV',
               'ncp':'NCP',
               'a_dbz':'SPEC_ATT_REFL',
               'a_zdr':'SPEC_ATT_DIFF',
               'rhv_corr':'RHOHV_CORR',
               'temp':'TEMPERATURE',
               'alt':'HEIGHT',
               'snr':'SNR',
               'cbb':'CBB',
               'hca':'HCA',
               'hail_ke':'HAIL_KE',
               'shi':'SHI',
               'posh':'POSH',
               'mesh':'MESH',
               'hdr':'HDR',
               'hsda': 'HSDA'}

#hsda vars
hca_hail_idx = [9] #list of hail classe(s) indices in HCA
dzdr         = 0

#grid
grid_shape  = (41, 301, 301)
grid_limits = ((0, 20000), (-150000.0, 150000.0), (-150000.0, 150000.0))
grid_roi    = 2000

#multiprocessing
ncpu         = 8

#TODO
#Implement dzdr
#Implement beam blockage mapping

In [17]:
def calc_beam_blocking(radar_ffn, srtm_ffn):
    #load radar object
    try:
        if ".h5" in radar_ffn:
            radar = pyart.aux_io.read_odim_h5(radar_ffn)
        elif ".nc" or ".mdv" in radar_ffn:
            radar      = pyart.io.read(radar_ffn)
    except:
        print('file failed: ',radar_ffn)
        return None
    
    cbb_meta = common.beam_blocking(radar, srtm_ffn)
    
    return cbb_meta
    

In [18]:
def chunks(l, n):
    """
    Yield successive n-sized chunks from l.
    From http://stackoverflow.com/a/312464
    """
    for i in range(0, len(l), n):
        yield l[i:i + n]

In [19]:
def worker(radar_file_name,out_path,sonde_ffn):
    ###########################################################
    # Load file
    ###########################################################
    #load radar object
    try:
        if ".h5" in radar_file_name:
            radar = pyart.aux_io.read_odim_h5(radar_file_name)
            radar_name = radar.metadata['source'][6:8]
        elif ".nc" or ".mdv" in radar_file_name:
            radar      = pyart.io.read(radar_file_name)
            radar_name = radar.metadata['instrument_name'][0:3]
    except:
        print('file failed: ',radar_file_name)
        return None
    
    #extract date    
    date_str = radar.time['units'][-20:]
    dt       = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ')    

    #fix field names
    radar.add_field(fieldn['dbzh'], radar.fields.pop('reflectivity'))
    radar.add_field(fieldn['zdr'], radar.fields.pop('differential_reflectivity'))
    radar.add_field(fieldn['phi'], radar.fields.pop('differential_phase'))
    radar.add_field(fieldn['kdp'], radar.fields.pop('specific_differential_phase'))
    radar.add_field(fieldn['rhv'], radar.fields.pop('cross_correlation_ratio'))
    try:
        radar.add_field(fieldn['ncp'], radar.fields.pop('normalized_coherent_power'))
    except:
        pass
    
    #add cbb data
    radar.add_field(fieldn['cbb'], cbb_meta, replace_existing=True)
    
    ###########################################################
    # Filtering
    ###########################################################
    
    #rhohv gatefilter
    gatefilter = pyart.filters.GateFilter(radar)
    gatefilter.exclude_below(fieldn['rhv'], 0.7)
    
    #rhohv texture filtering
    #gatefilter = pyart.filters.moment_and_texture_based_gate_filter(
    
    ###########################################################
    # Correction
    ###########################################################
    
    #build temp information
    height, temperature, snr = cpol_prc.radar_codes.snr_and_sounding(radar, sonde_ffn, refl_field_name=fieldn['dbzh'], 
                                                                     temp_field_name = 'temp') #temp from radiosonde nc
    radar.add_field(fieldn['temp'], temperature, replace_existing=True)
    radar.add_field(fieldn['alt'], height, replace_existing=True)
    radar.add_field(fieldn['snr'], snr, replace_existing=True)
    
    #add NCP if it doesn't exist
    try:
        radar.fields[fieldn['ncp']]
        fake_ncp = False
    except KeyError:
        # Creating a fake NCP field.
        ncp = pyart.config.get_metadata('normalized_coherent_power')
        emr2 = np.zeros_like(snr['data'])
        emr2[snr['data'] > 7.5] = 1
        ncp['data'] = emr2
        ncp['description'] = "THIS FIELD IS FAKE. SHOULD BE REMOVED!"
        radar.add_field(fieldn['ncp'], ncp)
        fake_ncp = True
    
    #RHOHV Noise correct
    rho_corr = cpol_prc.radar_codes.correct_rhohv(radar, rhohv_name=fieldn['rhv'], snr_name=fieldn['snr'])
    radar.add_field_like(fieldn['rhv'], fieldn['rhv_corr'], rho_corr, replace_existing=True)
    
    #ZDR Noise Correct
    corr_zdr = cpol_prc.radar_codes.correct_zdr(radar, zdr_name=fieldn['zdr'], snr_name=fieldn['snr'])
    radar.add_field_like(fieldn['zdr'], fieldn['zdr_corr'], corr_zdr, replace_existing=True)
    
    #unfold phidp
    phi_unfold = cpol_prc.phase.unfold_raw_phidp(radar, refl_field=fieldn['dbzh'], ncp_field=fieldn['ncp'], 
                                                 rhv_field=fieldn['rhv_corr'], phi_name=fieldn['phi'])
    radar.add_field(fieldn['phi_unfold'], phi_unfold, replace_existing=True)

    #recalculate phidp
    phimeta, kdpmeta = cpol_prc.phase.phidp_bringi(radar, gatefilter, refl_field=fieldn['dbzh'], ncp_name=fieldn['ncp'], 
                                                   rhohv_name=fieldn['rhv_corr'], unfold_phidp_name=fieldn['phi_unfold'])
    radar.add_field(fieldn['phi_bringi'], phimeta, replace_existing=True)
    radar.add_field(fieldn['kdp_bringi'], kdpmeta, replace_existing=True)
    radar.fields[fieldn['phi_bringi']]['long_name'] = "corrected_differential_phase"
    radar.fields[fieldn['kdp_bringi']]['long_name'] = "corrected_specific_differential_phase"

    ###########################################################
    # Attenuation
    ###########################################################
    
    #ZH attenuation correction
    atten_spec, zh_corr = cpol_prc.attenuation.correct_attenuation_zh_pyart(radar, refl_field=fieldn['dbzh'], ncp_field=fieldn['ncp'], 
                                                                            rhv_field=fieldn['rhv_corr'], phidp_field=fieldn['kdp_bringi'])
    radar.add_field(fieldn['dbzh_corr'], zh_corr, replace_existing=True)
    radar.add_field(fieldn['a_dbz'], atten_spec, replace_existing=True)    
    
    #ZDR attenuation correction
    atten_spec_zdr, zdr_corr = cpol_prc.attenuation.correct_attenuation_zdr(radar, zdr_name=fieldn['zdr_corr'], kdp_name=fieldn['kdp_bringi'], 
                                                                            alpha=0.016)
    radar.add_field_like(fieldn['zdr'], fieldn['zdr_corr'], zdr_corr, replace_existing=True)
    radar.add_field(fieldn['a_zdr'], atten_spec_zdr,
                    replace_existing=True)
    
    ###########################################################
    # Apply filter
    ###########################################################
    
    #apply rhohv filter
    radar.fields[fieldn['dbzh_corr']]['data']   = cpol_prc.filtering.filter_hardcoding(radar.fields[fieldn['dbzh_corr']]['data'], gatefilter)
    radar.fields[fieldn['zdr_corr']]['data']   = cpol_prc.filtering.filter_hardcoding(radar.fields[fieldn['zdr_corr']]['data'], gatefilter)
    radar.fields[fieldn['kdp_bringi']]['data'] = cpol_prc.filtering.filter_hardcoding(radar.fields[fieldn['kdp_bringi']]['data'], gatefilter)
    radar.fields[fieldn['rhv_corr']]['data']      = cpol_prc.filtering.filter_hardcoding(radar.fields[fieldn['rhv_corr']]['data'], gatefilter)
    
    ###########################################################
    # Classifications
    ###########################################################
    
    #CSU HCA
    hydro_class = cpol_prc.hydrometeors.hydrometeor_classification(radar, refl_name=fieldn['dbzh_corr'], zdr_name=fieldn['zdr_corr'], 
                                                                   kdp_name=fieldn['kdp_bringi'], rhohv_name=fieldn['rhv_corr'], 
                                                                   height_name=fieldn['alt'], temperature_name=fieldn['temp'])
    radar.add_field(fieldn['hca'], hydro_class, replace_existing=True)    
    
    #HSDA
    hsda_meta = hsda.main(radar,sonde_ffn,fieldn,hca_hail_idx,dzdr)
    radar.add_field(fieldn['hsda'], hsda_meta, replace_existing=True) 
    
    #HDR
    hdr_meta = hdr.main(radar,fieldn)
    radar.add_field(fieldn['hdr'], hdr_meta, replace_existing=True)
    
    ###########################################################
    # CFradial output
    ###########################################################
    
    # Removing fake and useless fields.
    if fake_ncp:
        radar.fields.pop(fieldn['ncp'])
    
    #write radar object to file
    out_fn  = '_'.join([radar_name, dt.strftime('%Y%m%d_%H%M%S'), 'processed']) + '.nc'
    out_ffn = '/'.join([out_path, out_fn])
    try:
        os.remove(out_ffn)
    except OSError:
        pass  
    pyart.io.write_cfradial(out_ffn, radar)
    
    print('completed volume ' + out_ffn)
    
    ###########################################################
    # Gridded Processing and Output
    ###########################################################
    
    out_fn  = '_'.join([radar_name, dt.strftime('%Y%m%d_%H%M%S'), 'meshgrids']) + '.nc'
    out_ffn = '/'.join([out_path,out_fn])
    try:
        os.remove(out_ffn)
    except OSError:
        pass
    
    #genreate grid object
    grid = pyart.map.grid_from_radars(
        radar,
        grid_shape = grid_shape,
        grid_limits = grid_limits,
        roi_func='constant', constant_roi = grid_roi)
    #MESH
    mesh.main(grid, fieldn, out_ffn, sonde_ffn)
    
    print('completed grid ' + out_ffn)
    
    return None

In [None]:
#loop through target folders
for data_pair in target_data:
    #radar in/out data
    data_in_path   = '/'.join([vol_path, data_pair[0]])
    data_out_path  = '/'.join([out_path, data_pair[0]])
    if not os.path.exists(data_out_path):
            os.makedirs(data_out_path)
            
    #sonde file name
    sonde_ffn        = '/'.join([sonde_path, data_pair[1]])
    
    #index vol files
    vol_filelist = sorted(glob.glob(data_in_path + '/*'))

    #calculate beam blocking using first radar file
    cbb_meta     = calc_beam_blocking(vol_filelist[0], srtm_ffn)
    
    # Cutting the file list into smaller chunks. (The multiprocessing.Pool instance
    # is freed from memory, at each iteration of the main for loop).
    chunked_list = chunks(vol_filelist, ncpu)
    i            = 0
    n_files      = len(vol_filelist)
    #loop through chunks
    for one_slice in chunked_list:
        args_list = [(onefile, data_out_path, sonde_ffn) for onefile in one_slice]
        with Pool(ncpu) as pool:
            pool.starmap(worker, args_list)
            #update user
            i += ncpu
            print('processed: ' + str(round(i/n_files*100,2)))
        
print('finished')