In [None]:
import warnings
warnings.filterwarnings('ignore')

import os
os.environ['PROJ_LIB'] = '/home/jhemedinger/anaconda3/envs/qvp_env/share/proj/'
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, metrics, wait
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
import dask.bag as db
import numpy as np
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, progress
import pyart
import netCDF4
import xarray
import tempfile
import shutil
from netCDF4 import num2date
import json
#from time import strftime, sleep
import datetime
import glob
import subprocess
import matplotlib
import matplotlib.pyplot as plt
plt.switch_backend('agg')
%matplotlib inline

In [None]:
def run_qvp(radar_file_path, fields):
    """For dask we need the radar plotting routines all in one subroutine"""
    try:
        radar = pyart.io.read(radar_file_path)
    except OSError:
        return
    
    radar_start_date = netCDF4.num2date(radar.time['data'][0], 
                                        radar.time['units'])
    time = datetime.datetime.strftime(radar_start_date, '%Y-%m-%dT%H:%M:%S')
    date = datetime.datetime.strftime(radar_start_date, '%Y%m%d')
    
    
    qvp = pyart.retrieve.quasi_vertical_profile(radar, fields=fields)
    
    
    data = xarray.Dataset()
    data['time'] = time
    data['height'] = qvp['height']
    data['temperature'] = qvp['temperature']
    data['specific_attenuation_reflectivity'] = qvp['specific_attenuation_reflectivity']
    data['specific_attenuation_differential_reflectivity'] = qvp['specific_attenuation_differential_reflectivity']
    data['radar_echo_classification'] = qvp['radar_echo_classification']
    data['radar_estimated_rain_rate'] = qvp['radar_estimated_rain_rate']
    data['D0'] = qvp['D0']
    data['NW'] = qvp['NW']
    data['velocity'] = qvp['velocity']
    data['region_dealias_velocity'] = qvp['region_dealias_velocity']
    data['velocity_texture'] = qvp['velocity_texture']
    data['total_power'] = qvp['total_power']
    data['reflectivity'] = qvp['reflectivity']
    data['cross_correlation_ratio'] = qvp['cross_correlation_ratio']
    data['differential_reflectivity'] = qvp['differential_reflectivity']
    data['corrected_differential_reflectivity'] = qvp['corrected_differential_reflectivity']
    data['differential_phase'] = qvp['differential_phase']
    data['corrected_differential_phase'] = qvp['corrected_differential_phase']
    data['corrected_specific_differential_phase'] = qvp['corrected_specific_differential_phase']
    data['spectrum_width'] = qvp['spectrum_width']
    data['signal_to_noise_ratio'] = qvp['signal_to_noise_ratio']
    
    data.to_netcdf('/lcrc/group/earthscience/radar/xsapr_sgp/vads/qvp_out/cpol_qvp/qvp_' + str(time) + '.nc')
    data.close()
    del radar
    del qvp

    return

In [None]:
radar_path = '/lcrc/group/earthscience/radar/CPOL_level_1b/PPI/2006/20060124/'
fields = None
gatefilter = None
#radar_tilt = 20.0

In [None]:
if os.path.isdir(radar_path):
    radar_files = glob.glob(radar_path + '/**/*', recursive=True)

elif os.path.isfile(radar_path):
    with open(radar_path) as f:
        radar_files = f.readlines()
    radar_files = [x.strip() for x in radar_files]
else:
    raise IOError('The specified radar path does not exist!')

In [None]:
radar_files

In [None]:
print(len(radar_files))

In [None]:
cluster = SLURMCluster(cores=8, job_cpu=8, walltime='01:00:00', memory='128GB', 
                      local_dir='/home/jhemedinger/dask_worker_space/', project='ACPC')
                     #scheduler_file='/home/zsherman/scheduler.json'
cluster.scale(8)         # Ask for ten workers
client = Client(cluster)  # Connect this local process to remote workers

In [None]:
cluster

In [None]:
client

In [None]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_qvp(x, fields=fields)
futures = the_bag.map(the_function)

In [None]:
futures.compute()

In [None]:
cluster.stop_all_jobs()

In [None]:
files = glob.glob('/lcrc/group/earthscience/radar/xsapr_sgp/vads/qvp_out/cpol_qvp//*')
files.sort()
print(files)
print(len(files))

In [None]:
time = []
height = []
temperature = []
specific_attenuation_reflectivity = []
specific_attenuation_differential_reflectivity = []
radar_echo_classification = []
radar_estimated_rain_rate = []
D0 = []
NW = []
velocity = []
region_dealias_velocity = []
velocity_texture = []
total_power = []
reflectivity = []
cross_correlation_ratio = []
differential_reflectivity = []
corrected_differential_reflectivity = []
differential_phase = []
corrected_differential_phase = []
corrected_specific_differential_phase = []
spectrum_width = [] 
signal_to_noise_ratio = []

In [None]:
for file in files:
    ds = netCDF4.Dataset(file)
    t = ds['time'][:]
    hght = ds['height'][:]
    #sar = ['specific_attenuation_reflectivity'][:]
    #sadr = ['specific_attenuation_differential_reflectivity'][:]
    rec = ds['radar_echo_classification'][:]
    rerr = ds['radar_estimated_rain_rate'][:]
    d0 = ds['D0'][:]
    nw = ds['NW'][:]
    vel = ds['velocity'][:]
    #rdv = ds['region_dealias_velocity'][:]
    vt = ds['velocity_texture'][:]
    tp = ds['total_power'][:]
    ref = ds['reflectivity'][:]
    cc = ds['cross_correlation_ratio'][:]
    zdr = ds['differential_reflectivity'][:]
    czdr = ds['corrected_differential_reflectivity'][:]
    dp = ds['differential_phase'][:]
    cdp = ds['corrected_differential_phase'][:]
    csdp = ds['corrected_specific_differential_phase'][:]
    sw = ds['spectrum_width'][:]
    stnr = ds['signal_to_noise_ratio'][:]
    
    time.append(t)
    height.append(hght)
    #specific_attenuation_reflectivity(sar)
    #specific_attenuation_differential_reflectivity(sadr)
    radar_echo_classification.append(rec)
    radar_estimated_rain_rate.append(rerr)
    D0.append(d0)
    NW.append(nw)
    #region_dealias_velocity(rdv)
    velocity.append(vel)
    velocity_texture.append(vt)
    total_power.append(tp)
    reflectivity.append(ref)
    cross_correlation_ratio.append(cc)
    differential_reflectivity.append(zdr)
    corrected_differential_reflectivity.append(czdr)
    differential_phase.append(dp)
    corrected_differential_phase.append(cdp)
    corrected_specific_differential_phase.append(csdp)
    spectrum_width.append(sw)
    signal_to_noise_ratio.append(stnr)

In [None]:
new_t = np.array(time, dtype='datetime64[ns]')
print(new_t.shape)

In [None]:
new_time = np.array(time, dtype='datetime64[ns]')
new_height = np.array(hght)
new_rec = np.array(radar_echo_classification)
new_rerr = np.array(radar_estimated_rain_rate)
new_D0 = np.array(D0)
new_NW = np.array(NW)
new_vel = np.array(velocity)
new_vt = np.array(velocity_texture)
new_tp = np.array(total_power)
new_ref = np.array(reflectivity)
new_cc = np.array(cross_correlation_ratio)
new_zdr = np.array(differential_reflectivity)
new_czdr = np.array(corrected_differential_reflectivity)
new_dp = np.array(differential_phase)
new_cdp = np.array(corrected_differential_phase)
new_csdp = np.array(corrected_specific_differential_phase)
new_sw = np.array(spectrum_width)
new_stnr = np.array(signal_to_noise_ratio)

In [None]:
print(new_time.shape)
print(new_height.shape)
print(new_ref.shape)

In [None]:
fig = plt.figure(figsize=[15,5])
ax = plt.gca()
#ax.xaxis_date()
cmap = pyart.graph.cm_colorblind.HomeyerRainbow
#for i in range(10):
    #Xq, Yq = np.meshgrid(new_time[i], new_height/1000)
    
    
img = plt.pcolormesh(new_time, new_height/1000, new_ref.transpose(), 
                     cmap=cmap, vmin=-20, vmax=64)
plt.xlabel('Time (UTC)')
plt.ylabel('Height (km)')
plt.ylim([0,20])
plt.xticks(rotation=45)
plt.title('Reflectivity QVP')
cb = plt.colorbar(img, cmap=cmap)
cb.set_label('Mean Reflectivity Factor (dBZ)')
#plt.savefig('ref_with_gate_id_zdr>1gatefilter.png', dpi=300)
plt.show()