In [2]:
import os
import warnings
from datetime import datetime

import numpy as np
import pyart
from matplotlib import pyplot as plt
import cftime

import radar_codes
import filtering
import phase
import hydrometeors
import attenuation
import rainrate
import file_util

import dask
import dask.bag as db
from dask.diagnostics import ProgressBar

warnings.simplefilter('ignore')


In [3]:
#config
vol_root = '/g/data/rq0/level_1/odim_pvol'
cf_root = '/g/data/kl02/jss548/PST/polarimetric_rain/cfradial'
rf_root = '/g/data/kl02/jss548/PST/polarimetric_rain/rf3_instant_rainrate'
VERBOSE = False

rid = 2
date_str = '20200119'
# rid = 66
# date_str = '20200309'

NCPU = 15


In [4]:
def torrentfields(vol_ffn):

    print('processing', vol_ffn)

    #read radar volume
    radar = pyart.aux_io.read_odim_h5(vol_ffn, file_field_names=True)
    #get time
    valid_time = cftime.num2pydate(radar.time['data'][0], radar.time['units'])

    #get radar band
    wavelength = radar_codes.get_wavelength(vol_ffn)
    if wavelength<8:
        band = 'C'
    else:
        band = 'S'
    if VERBOSE:
        print('band', band)
        
    ##################################################################################################
    #
    # Preprocessing
    #
    ##################################################################################################

    # Correct RHOHV
    rho_corr = radar_codes.correct_rhohv(radar, snr_name='SNRH')
    radar.add_field_like('RHOHV', 'RHOHV_CORR', rho_corr, replace_existing=True)

    # Correct ZDR
    corr_zdr = radar_codes.correct_zdr(radar, snr_name='SNRH')
    radar.add_field_like('ZDR', 'ZDR_CORR', corr_zdr, replace_existing=True)

    # Temperature    
    height, temperature, isom = radar_codes.temperature_profile_access(radar)
    radar.add_field('temperature', temperature, replace_existing=True)
    radar.add_field('height', height, replace_existing=True)
    radar.add_field('height_over_isom', isom, replace_existing=True)

    # GateFilter
    gatefilter = filtering.do_gatefilter(radar,
                                         refl_name='DBZH',
                                         phidp_name="PHIDP",
                                         rhohv_name='RHOHV_CORR',
                                         zdr_name="ZDR_CORR",
                                         snr_name='SNRH')
    #create fake NCP
    ncp = pyart.config.get_metadata('normalized_coherent_power')
    ncp['data'] = np.zeros_like(radar.fields['RHOHV']['data'])
    ncp['data'][gatefilter.gate_included] = 1
    radar.add_field('NCP', ncp, replace_existing=True)

    # phidp filtering
    phidp, kdp = phase.phidp_giangrande(radar, gatefilter, rhv_field='RHOHV_CORR', refl_field='DBZH')
    radar.add_field('PHIDP_VAL', phidp, replace_existing=True)
    radar.add_field('KDP_VAL', kdp, replace_existing=True)
    kdp_field_name = 'KDP_VAL'
    phidp_field_name = 'PHIDP_VAL'

    # Hydrometeors classification
    hydro_class = hydrometeors.hydrometeor_classification(radar,
                                                          gatefilter,
                                                          kdp_name=kdp_field_name,
                                                          zdr_name='ZDR_CORR',
                                                          rhohv_name='RHOHV_CORR',
                                                          refl_name='DBZH',
                                                          band=band)

    radar.add_field('radar_echo_classification', hydro_class, replace_existing=True)
    
    ##################################################################################################
    #
    # Retrievals
    #
    ##################################################################################################
    from importlib import reload
    reload(rainrate)
    reload(attenuation)

    #estimate alpha
    alpha, alpha_method = attenuation.estimate_alpha_zhang2020(radar, band,
                                           refl_field='DBZH', zdr_field='ZDR_CORR', rhohv_field='RHOHV_CORR',
                                           verbose=VERBOSE)

    #estimate specific attenuation
    if VERBOSE:
        print('alpha', alpha)
    radar = attenuation.retrieve_zphi(radar, band, alpha=alpha, alpha_method=alpha_method,
                                     refl_field='DBZH', phidp_field=phidp_field_name, rhohv_field='RHOHV_CORR')

    #estimate rainfall
    radar = rainrate.conventional(radar, alpha=92, beta=1.7, refl_field='corrected_reflectivity')
    radar = rainrate.polarimetric(radar, band, refl_field='corrected_reflectivity', kdp_field=kdp_field_name, rhohv_field='RHOHV_CORR')
    
    ##################################################################################################
    #
    # Write outputs CF Radial
    #
    ##################################################################################################
    
    #write to cf output
    
    #create paths
    cf_path = f'{cf_root}/{rid:02}/{date_str}'
    if not os.path.exists(cf_path):
        os.makedirs(cf_path)
    cf_fn = f'{rid:02}_{valid_time.strftime("%Y%m%d_%H%M%S")}.vol.nc' #this filename should match
    cf_ffn = f'{cf_path}/{cf_fn}'
    #write to cf
    pyart.io.write_cfradial(cf_ffn, radar)
    
    ##################################################################################################
    #
    # Create and write grid
    #
    ##################################################################################################
        
    # grid RF
    GRID_SHAPE = (6, 512, 512)
    GRID_LIMITS = ((0, 2500), (-128000.0, 128000.0), (-128000.0, 128000.0))
    GRID_ROI = 2500
    hydrid_gatefilter = pyart.correct.GateFilter(radar)
    hydrid_gatefilter.exclude_masked('hybrid_rainrate')
    radar_lat = radar.latitude['data'][0]
    radar_lon = radar.longitude['data'][0]
    standard_lat_1 = radar_lat+1.0
    standard_lat_2 = radar_lat-1.0
    grid_proj ={'proj':'aea', 'lat_1':standard_lat_1, 'lat_2':standard_lat_2,
               'lon_0':radar_lon, 'lat_0':radar_lat}
    rf_grid = pyart.map.grid_from_radars(
                    radar,
                    grid_shape = (6, 512, 512),
                    grid_limits = ((0, 2500), (-128000.0, 128000.0), (-128000.0, 128000.0)),
                    roi_func = 'constant',
                    grid_projection = grid_proj,
                    gatefilter = hydrid_gatefilter,
                    constant_roi = 2500,
                    weighting_function = 'Barnes2',
                    fields = ['hybrid_rainrate'])
    
    #extract lowest valid values
    rain_grid = rf_grid.fields['hybrid_rainrate']['data']
    rain_grid_mask = np.ma.getmaskarray(rain_grid)
    rain_grid_2d = np.zeros((GRID_SHAPE[1],GRID_SHAPE[2]))
    for i in range(GRID_SHAPE[1]):
        for j in range(GRID_SHAPE[2]):
            col_rain = rain_grid[:,i,j]
            mask = ~np.ma.getmaskarray(col_rain)
            if np.any(mask):
                rain_grid_2d[i,j] = col_rain[np.where(mask)[0][0]]
                
    #create paths
    rf_path = f'{rf_root}/{rid:02}/{date_str}'
    if not os.path.exists(rf_path):
        os.makedirs(rf_path)
    rf_fn = f'{rid}_{valid_time.strftime("%Y%m%d_%H%M%S")}.prcp-rrate.nc' #this filename should match
    rf_ffn = f'{rf_path}/{rf_fn}'
    
    #write to nc
    file_util.write_rf_nc(rf_ffn, rid, valid_time.timestamp(), rain_grid_2d, radar_lon, radar_lat, (standard_lat_1, standard_lat_2))
    

In [5]:
def manager(date_str, rid):

    #unpack and list daily zip
    vol_zip = f'{vol_root}/{rid:02}/{date_str[0:4]}/vol/{rid:02}_{date_str}.pvol.zip'
    temp_dir = True
    vol_ffn_list = file_util.unpack_zip(vol_zip)

    
#     for vol_ffn in vol_ffn_list[147:]:
#         torrentfields(vol_ffn)
    #run retrieval
    i            = 0
    n_files      = len(vol_ffn_list)   
    for flist_chunk in file_util.chunks(vol_ffn_list, NCPU): #CUSTOM RANGE USED
        bag = db.from_sequence(flist_chunk).map(torrentfields)
        _ = bag.compute()
        i += NCPU
        del bag
        print('processed: ' + str(round(i/n_files*100,2)))
        
    #clean up
    temp_vol_dir = os.path.dirname(vol_ffn_list[0])
    if '/tmp' in temp_vol_dir:
        os.system('rm -rf '+ temp_vol_dir)

In [None]:
manager(date_str, rid)

processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_000028.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_000628.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_001228.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_003031.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_003628.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_005428.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_010630.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_011228.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_004231.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_004828.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_012428.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_010031.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmpjrnfb0kh/02_20200119_011828.pvol.h5
processing /jobfs/7844218.gadi-pbs/tmp