In [3]:
import os
from datetime import datetime, timedelta
from glob import glob
from multiprocessing import Pool

import numpy as np
import xarray as xr
from scipy.ndimage import map_coordinates

from matplotlib import pyplot as plt
from tqdm import tqdm

import pyproj

In [4]:
#TODO: Need to be more careful when processing RF3 and DP data - need to reject RF3 pixel where there is a no DP data

In [34]:
NCPU = 15
radar_id = 31 #2,64,66,71,31,32,95
date_start_str = '20200401'
date_end_str = '20210430'
dp_path = '/g/data/kl02/jss548/PST/dprain/dprain-retrieval/rfgrid'
rf3_path = '/g/data/rq0/admin/dprain-verify-orignal/rf3_rrate'
of_path = '/g/data/rq0/admin/dprain-verify-orignal/optflow'
gauge_path = '/g/data/rq0/admin/dprain-verify-orignal/gaugeobs'

out_path = '/g/data/kl02/jss548/PST/dprain/dprain-verify-summary'

range_limit = 100 #km
min_precip = 2.5 #mm in 15min
grid_res = 500 #m

radar_id_str = f'{radar_id:02}'

rf3_gap_fill = False

In [35]:
def chunks(l, n):
    """
    Yield successive n-sized chunks from l.
    From http://stackoverflow.com/a/312464
    """
    for i in range(0, len(l), n):
        yield l[i:i + n]

def daterange(date1, date2):
    """
    Generate date list between dates
    """
    date_list = []
    for n in range(int ((date2 - date1).days)+1):
        date_list.append(date1 + timedelta(n))
    return date_list

def advection(rrate1, rrate2,
              oflow1_pix, oflow2_pix,
              T_start=0, T_end=6,
              T=6, t=1):
    """
    WHAT:
    Returns the accumulated rainfall amount by applying a time weighted interpolation from radar volume 1 to radar volume 2
    using optical flow fields (u,v and both timesteps) and an interpolation timestep of t. Can return r_acc only for a portion of the timespan
    between radar volumes 1 and 2.
    
    HELP
    note: first radar volume is the oldest. second radar volume is the newest.

    INPUTS
    rrate1/rrate2: rain rate (mm/hr) for first radar volume and second radar volume
    oflow1_pix/oflow2_pix: optical flow (pix/min) in [u and v] direction for first radar volume and second radar volume (list of length 2, elements are 2D np.array)
    T_start: starting timestep (minutes from radar volume 1) for interpolation (0 will include the first radar timestep)
    T_end: ending timestep (minutes from radar volume 2) for interpolation (T_end=T will include the second radar timestep)
    T: time difference between radar volumes (minutes)
    t: timestep for interpolation (minutes)
    
    OUTPUTS:
    r_acc: accumulated rainfall totals (mm)
    """

    # Perform temporal interpolation
    r_acc = np.zeros((rrate1.shape))
    x, y = np.meshgrid(
        np.arange(rrate1.shape[1], dtype=float), np.arange(rrate1.shape[0], dtype=float)
    )
    for i in range(T_start, T_end + t, t):
        #shift timestep 1 forwards (this is the older timestep)
        ts_forward = -i
        y1_shift = y + (ts_forward * oflow1_pix[1])
        x1_shift = x + (ts_forward * oflow1_pix[0])
        pos1 = (y1_shift, x1_shift)
        R1 = map_coordinates(rrate1, pos1, order=1)
        weight1 = (T - i)/T

        #shift timestep 2 backwards (this is the newer timestep)
        ts_backwards = (T-i)
        y2_shift = y + (ts_backwards * oflow2_pix[1])
        x2_shift = x + (ts_backwards * oflow2_pix[0])
        pos2 = (y2_shift, x2_shift)
        R2 = map_coordinates(rrate2, pos2, order=1)
        weight2 = i/T

        #weighted combination
        rrate_interp = R1 * weight1 + R2 * weight2

        #convert to mm/hr in mm
        r_acc += rrate_interp/60*t
        
    return r_acc


def calc_distance(radar_lat, radar_lon, site_lat_list, site_lon_list):
    #Haversine formula
    #radius of earth
    R = 6373.0
    #convert to radians
    radar_lat = np.radians(radar_lat)
    radar_lon = np.radians(radar_lon)
    site_lat_list = np.radians(site_lat_list)
    site_lon_list = np.radians(site_lon_list)
    #formulation
    dlon = site_lon_list - radar_lon
    dlat = site_lat_list - radar_lat
    a = np.sin(dlat / 2)**2 + np.cos(radar_lat) * np.cos(site_lat_list) * np.sin(dlon / 2)**2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    return R * c

def datetime_from_list(ffn_list):
    
    dt_list = []
    for item in ffn_list:
        basename = os.path.basename(item)
        parts = basename.split('_')
        date_str = parts[1]
        time_str = parts[2][0:6]
        dt_list.append(datetime.strptime(date_str + '_' + time_str, '%Y%m%d_%H%M%S'))
    return dt_list

def read_file(dt, file_type):
    if file_type=='rf3':
        nc_ffn = f'{rf3_path}/{radar_id_str}/{dt.year}/{dt.month:02}/{dt.day:02}/{radar_id}_{dt.strftime("%Y%m%d_%H%M%S")}.prcp-rrate.nc'
    elif file_type=='dp':
        try:
            #ignore sectonds on dp file
            nc_ffn = glob(f'{dp_path}/{radar_id_str}/{dt.year}{dt.month:02}{dt.day:02}/{radar_id}_{dt.strftime("%Y%m%d_%H%M")}*.prcp-rrate.nc')[0]
        except:
            #check for filename that might have been rounded up (using prev minute)
            try:
                print('no dp files found 1st pass for:', dt, f'{dp_path}/{radar_id_str}/{dt.year}{dt.month:02}{dt.day:02}/{radar_id}_{dt.strftime("%Y%m%d_%H%M")}*.prcp-rrate.nc')
                dt_offset = dt - timedelta(minutes=1)
                nc_ffn = glob(f'{dp_path}/{radar_id_str}/{dt_offset.year}{dt_offset.month:02}{dt_offset.day:02}/{radar_id}_{dt_offset.strftime("%Y%m%d_%H%M")}*.prcp-rrate.nc')[0]
            except:
                print('no dp files found 2nd pass for:', dt, f'{dp_path}/{radar_id_str}/{dt_offset.year}{dt_offset.month:02}{dt_offset.day:02}/{radar_id}_{dt_offset.strftime("%Y%m%d_%H%M")}*.prcp-rrate.nc')
                return None
    elif file_type=='of':
        nc_ffn = f'{of_path}/{radar_id_str}/{dt.year}/{dt.month:02}/{dt.day:02}/{radar_id}_{dt.strftime("%Y%m%d_%H%M%S")}.optflow.nc'
    try:
        with xr.open_dataset(nc_ffn) as ds:
            if file_type=='rf3' or file_type=='dp': 
                rrate = ds['rain_rate'].data
                #rrate[rrate==0] = np.nan
                return rrate
            elif file_type=='of':
                return [ds['flow_u'].data*60/grid_res, ds['flow_v'].data*60/grid_res]
    except:
        print('missing file:', nc_ffn)
        return None 

def run_advection(vol_dt_list, vol_idx, file_type, gauge_start_dt64, gauge_end_dt64):    

    #open inital dataset
    dt1 = vol_dt_list[vol_idx[0]]
    rrate1 = read_file(dt1, file_type)
    oflow1_pix = read_file(dt1, 'of')
    #abort on errors
    if rrate1 is None or oflow1_pix is None:
        return None, True
    
    acrain = np.zeros_like(rrate1)
    #advect rainrate files
    for idx in vol_idx[1:]:
        dt2 = vol_dt_list[idx]
        
        #open next dataset
        rrate2 = read_file(dt2, file_type)
        oflow2_pix = read_file(dt2, 'of')         
        #abort on errors
        if rrate2 is None or oflow2_pix is None:
            return None, True  
        
        #time
        dt64_1 = np.array(dt1, dtype='datetime64[m]')
        dt64_2 = np.array(dt2, dtype='datetime64[m]')
        td_m = (dt64_2-dt64_1)/np.timedelta64(1, 'm')
        
        #check for volumes outside the gauge times
        start_diff_min = (dt64_1-gauge_start_dt64)/np.timedelta64(1, 'm')
        end_diff_min = (gauge_end_dt64-dt64_2)/np.timedelta64(1, 'm')
        T_start = 0
        T_end = int(td_m)
        skip = False
        if start_diff_min <= -td_m:
            skip = True
        elif start_diff_min < 0:
            T_start = int(start_diff_min)
        if end_diff_min <= -td_m:
            skip=True
        elif end_diff_min < 0:
            T_end = int(end_diff_min)
            
        #run accumulations
        if not skip:
            acrain += advection(rrate1, rrate2,
                             oflow1_pix, oflow2_pix,
                             T_start=T_start, T_end=T_end,
                             T=td_m, t=1)
        
        #move next to prev
        dt1 = dt2
        rrate1 = rrate2
        oflow1_pix = oflow2_pix
        
    return acrain, False

In [38]:
def worker(date_dt):
    
    verbose = True
    verbose2= False
    #init
    date_str = date_dt.strftime('%Y%m%d')
    #build paths
    gauge_folder = f'{gauge_path}/{date_dt.year}/{date_dt.month:02}/{date_dt.day:02}'
    of_folder = f'{of_path}/{radar_id_str}/{date_dt.year}/{date_dt.month:02}/{date_dt.day:02}'
    rr_orig_folder = f'{rf3_path}/{radar_id_str}/{date_dt.year}/{date_dt.month:02}/{date_dt.day:02}'
    rr_dp_folder = f'{dp_path}/{radar_id_str}/{date_dt.year}{date_dt.month:02}{date_dt.day:02}'
    #list files
    gauge_fflist = sorted(glob(gauge_folder + '/*.nc'))
    of_fflist = sorted(glob(of_folder + '/*.nc'))
    rr_orig_fflist = sorted(glob(rr_orig_folder + '/*.nc'))
    rr_dp_fflist = sorted(glob(rr_dp_folder + '/*.nc'))
    
    #check for any files
    if len(rr_orig_fflist) < 2:
        print('no files found for', date_dt)
        return None

    #read radar latlon
    with xr.open_dataset(rr_orig_fflist[0]) as ds:
        radar_lat = ds.proj.latitude_of_projection_origin
        radar_lon = ds.proj.longitude_of_central_meridian
        x_grid, y_grid = np.meshgrid(ds.x.data, ds.y.data)

    #create latlon grid
    proj = pyproj.Proj(proj='aea', lat_1=ds.proj.standard_parallel[0], lat_2=ds.proj.standard_parallel[1],
                       lat_0=ds.proj.latitude_of_projection_origin, lon_0=ds.proj.longitude_of_central_meridian,
                       x_0=0, y_0=0)
    lon_grid, lat_grid = proj(x_grid*1000, y_grid*1000, inverse=True)

    #init
    gauge_lat_out = []
    gauge_lon_out = []
    gauge_rain_out = []
    gauge_dt64_out = []
    rf3_acrain_out = []
    dp_acrain_out = []

    rf_replace = 0
    n_gaugeobs = len(gauge_fflist)
    for i in range(n_gaugeobs):

        ##############################
        # Gauge filtering
        ##############################
        #load gauge
        try:
            gauge_ds = xr.open_dataset(gauge_fflist[i])
        except:
            if verbose:
                print('failed to open:', gauge_fflist[i])
            continue
        #find stations nearby
        gauge_id = gauge_ds['station_id'].data
        gauge_lat = gauge_ds['latitude'].data
        gauge_lon = gauge_ds['longitude'].data
        gauge_dist = calc_distance(radar_lat, radar_lon, gauge_lat, gauge_lon)
        gauge_dist_mask = gauge_dist <= range_limit
        print(len(np.unique(gauge_id[gauge_dist_mask])))
        break
        #apply gauge lower thresholds ########## to change later
        gauge_rain = gauge_ds['precipitation'].data
        gauge_rain_mask = gauge_rain >= min_precip
        #final index
        gauge_idx = np.where(np.logical_and(gauge_dist_mask, gauge_rain_mask))[0]
        #skip timestep if there's no gauges to process
        if len(gauge_idx) == 0:
            if verbose2:
                print('no gauges with rain')
            continue

        ##############################
        # find volumes to process for gauge timestep
        ##############################
        #find volume dates in range (and file before for rain advection)
        #read time of gauge dataset
        start_dt64 = np.array(gauge_ds['start_time'].data, dtype='datetime64[m]')
        valid_dt64 = np.array(gauge_ds['valid_time'].data, dtype='datetime64[m]')
        start_dt_parts = np.datetime_as_string(start_dt64, unit='D').split('-')
        valid_dt_parts = np.datetime_as_string(valid_dt64, unit='D').split('-')
        #list volumes
        start_folder = f'{rf3_path}/{radar_id_str}/{start_dt_parts[0]}/{start_dt_parts[1]}/{start_dt_parts[2]}'
        end_folder = f'{rf3_path}/{radar_id_str}/{valid_dt_parts[0]}/{valid_dt_parts[1]}/{valid_dt_parts[2]}'
        fflist = sorted(set(glob(start_folder + '/*.nc') + glob(end_folder + '/*.nc')))
        #convert filenames to datetimes
        vol_dt_list = datetime_from_list(fflist)
        vol_dt64_array = np.array(vol_dt_list, dtype='datetime64')
        #filter
        vol_idx = np.where(np.logical_and(vol_dt64_array>=start_dt64, vol_dt64_array<=valid_dt64))[0]
        if len(vol_idx) < 2:
            if verbose:
                print('no volumes in gauge step')
            continue
            
        #append trailing index for advection
        vol_idx = np.append([vol_idx[0]-1], vol_idx)
        vol_idx = np.append(vol_idx, [vol_idx[-1]+1])

        ##############################
        # generate radar accumulations
        ##############################
        rf3_acrain, error_flag = run_advection(vol_dt_list, vol_idx, 'rf3', start_dt64, valid_dt64)
        dp_acrain, error_flag  = run_advection(vol_dt_list, vol_idx, 'dp', start_dt64, valid_dt64)
        
        if error_flag==True or error_flag==True:
            #skip matching due to missing data
            if verbose:
                print('error flags')
            continue
            
        
        #match gauge site to radar site
        for idx in gauge_idx:
            #exact gauge information
            g_id = gauge_id[idx]
            g_lat = gauge_lat[idx]
            g_lon = gauge_lon[idx]
            g_rain = gauge_rain[idx]
            #cost function for nearest radar grid point
            cost = np.sqrt((lon_grid - g_lon)**2 \
                    + (lat_grid - g_lat)**2) #A cost function for searching
            grid_idx = np.where(cost == cost.min())

            rf3_value = rf3_acrain[grid_idx][0]
            dp_value = dp_acrain[grid_idx][0]
            
            #use RF3 for gaps
            if np.isnan(dp_value) and rf3_gap_fill:
                dp_value = rf3_value
                rf_replace += 1
                
            #append if not nan
            if not np.isnan(rf3_value) and not np.isnan(dp_value) and rf3_value>0 and dp_value>0:
                gauge_lat_out.append(g_lat)
                gauge_lon_out.append(g_lon)
                gauge_rain_out.append(g_rain)
                rf3_acrain_out.append(rf3_value)
                dp_acrain_out.append(dp_value)
                gauge_dt64_out.append(start_dt64)
            else:
                if verbose:
                    print('no valid data return from rainrate grid')
            
    #save to file
    if len(gauge_rain_out) > 0:
        out_folder = f'{out_path}/{radar_id_str}/'
        if not os.path.exists(out_folder):
            os.makedirs(out_folder)
        out_ffn = f'{out_folder}/{radar_id_str}_{date_str}.npz'
        np.savez(out_ffn, gauge_rain=gauge_rain_out, rf3_acrain=rf3_acrain_out, 
                 dp_acrain=dp_acrain_out, gauge_dt64=gauge_dt64_out, 
                 gauge_lat=gauge_lat_out, gauge_lon=gauge_lon_out,
                 rf_replace=rf_replace)
    print('processed', date_dt)

In [39]:
#manager
dt_start = datetime.strptime(date_start_str, '%Y%m%d')
dt_end = datetime.strptime(date_end_str, '%Y%m%d')
dt_list = daterange(dt_start, dt_end)

# run single processing
for dt in dt_list:
    worker(dt)

# #run multiproc
# i            = 0
# n_dates      = len(dt_list) 

# for dtlist_chunk in chunks(dt_list, NCPU): #CUSTOM RANGE USED
#     with Pool(NCPU) as pool:
#         _ = pool.map(worker, dtlist_chunk)
#     i += NCPU
#     print('processed: ' + str(round(i/n_dates*100,2)))

4
processed 2020-04-01 00:00:00
4
processed 2020-04-02 00:00:00
3
processed 2020-04-03 00:00:00
4
processed 2020-04-04 00:00:00
4
processed 2020-04-05 00:00:00
3
processed 2020-04-06 00:00:00
4
processed 2020-04-07 00:00:00
3
processed 2020-04-08 00:00:00
3
processed 2020-04-09 00:00:00
3
processed 2020-04-10 00:00:00
3
processed 2020-04-11 00:00:00
4
processed 2020-04-12 00:00:00
3
processed 2020-04-13 00:00:00
4
processed 2020-04-14 00:00:00
3
processed 2020-04-15 00:00:00
4
processed 2020-04-16 00:00:00
3
processed 2020-04-17 00:00:00
4
processed 2020-04-18 00:00:00
3
processed 2020-04-19 00:00:00
3
processed 2020-04-20 00:00:00
4
processed 2020-04-21 00:00:00
3
processed 2020-04-22 00:00:00
5
processed 2020-04-23 00:00:00
3
processed 2020-04-24 00:00:00
3
processed 2020-04-25 00:00:00
3
processed 2020-04-26 00:00:00
4
processed 2020-04-27 00:00:00
3
processed 2020-04-28 00:00:00
4
processed 2020-04-29 00:00:00
3
processed 2020-04-30 00:00:00
3
processed 2020-05-01 00:00:00
3
proces