In [None]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client, metrics, wait
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
import dask.bag as db
import numpy as np
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, progress
import os
os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
import pyart
import netCDF4
import tempfile
import shutil
from netCDF4 import num2date
import json
#from time import strftime, sleep
import os
import datetime
import glob
import subprocess
import matplotlib
import matplotlib.pyplot as plt
plt.switch_backend('agg')
%matplotlib inline
os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'

In [None]:
!module load data_wrapper

In [None]:
!adc_xfer -a /data/datastream/sgp/sgpxsaprsecI5.00/sgpxsaprsecI5.00.201807* /lustre/or-hydra/cades-arm/proj-shared/dask_test/unformatted/
!adc_xfer -a /data/datastream/sgp/sgpxsaprsecI5.00/sgpxsaprsecI5.00.201808* /lustre/or-hydra/cades-arm/proj-shared/dask_test/unformatted/
!adc_xfer -a /data/datastream/sgp/sgpxsaprsecI6.00/sgpxsaprsecI6.00.201807* /lustre/or-hydra/cades-arm/proj-shared/dask_test/unformatted/
!adc_xfer -a /data/datastream/sgp/sgpxsaprsecI6.00/sgpxsaprsecI6.00.201808* /lustre/or-hydra/cades-arm/proj-shared/dask_test/unformatted/
!adc_xfer -a /data/datastream/sgp/sgpxsaprsecI4.00/sgpxsaprsecI4.00.201807* /lustre/or-hydra/cades-arm/proj-shared/dask_test/unformatted/
!adc_xfer -a /data/datastream/sgp/sgpxsaprsecI4.00/sgpxsaprsecI4.00.201808* /lustre/or-hydra/cades-arm/proj-shared/dask_test/unformatted/

In [None]:
def run_vad(
    radar_file_path, vel_field, z_want, save_name, image_directory, bad_directory, overwrite):
    """ For dask we need the radar plotting routines all in one subroutine. """
    try:
        radar = pyart.io.read(radar_file_path)
    except TypeError:
        if bad_directory is None:
            path = os.path.expanduser('~') + '/' + 'type_error_radars/'
        else:
            path = bad_directory
        print(radar_file_path + ' has encountered TypeError!')
        if not os.path.exists(path):
            os.makedirs(path)
            subprocess.call('chmod -R g+rw ' + path, shell=True)
        shutil.move(radar_file_path, path)
        return

    radar_start_date = netCDF4.num2date(radar.time['data'][0],
                                        radar.time['units'])
    year_str = "%04d" % radar_start_date.year
    month_str = "%02d" % radar_start_date.month
    day_str = "%02d" % radar_start_date.day
    hour_str = "%02d" % radar_start_date.hour
    minute_str = "%02d" % radar_start_date.minute
    second_str = "%02d" % radar_start_date.second

    date_string = datetime.datetime.strftime(radar_start_date, '%Y%m%d.%H%M%S')
    combined_name = '.' + save_name + '.' + date_string + '.png'
    
    # Providing the image_directory and checking if it already exists.
    img_directory = (image_directory + '/' + year_str + month_str
                     + day_str + '.' + hour_str + minute_str + second_str)
    file_name = img_directory + '/vad' + combined_name

    if not os.path.exists(img_directory):
        os.makedirs(img_directory)
        subprocess.call('chmod -R g+rw ' + img_directory, shell=True)

    if overwrite is False and os.path.exists(file_name) is True:
        print(file_name + ' already exists.')
        return

    corr_vel = pyart.correct.dealias_region_based(
        radar, vel_field=vel_field,
        keep_original=False, centered=True)
    radar.add_field('corrected_velocity', corr_vel, replace_existing=True)

    vad = pyart.retrieve.velocity_azimuth_display(radar, 'corrected_velocity', z_want)
    del radar

    import matplotlib.pyplot as plt
    plt.switch_backend('agg')
    fig = plt.figure(figsize=[10, 8])
    plt.plot(vad.u_wind, vad.height, 'b-', label='U Wind')
    plt.plot(vad.v_wind, vad.height, 'r-', label='V Wind')
    plt.title('Velocity Azimuth Display ' + date_string)
    plt.ylabel('Height (m)')
    plt.xlabel('Wind (m/s)')
    #plt.xlim(-40, 40)
    #plt.ylim(0, 15000)
    plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    plt.savefig(file_name)
    plt.close()
    del vad
    return

In [None]:
radar_path = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprsecI4.00/201808/'
save_name = 'sgpxsaprsecI4.00'
bad_directory = None
image_directory = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprvadI4.png'
overwrite = True
verbose = False
vel_field = 'velocity'
z_want = np.linspace(0, 10000, 101)

In [None]:
if os.path.isdir(radar_path):
    radar_files = glob.glob(radar_path + '/**/*', recursive=True)

elif os.path.isfile(radar_path):
    with open(radar_path) as f:
        radar_files = f.readlines()
    radar_files = [x.strip() for x in radar_files]
else:
    raise IOError('The specified radar path does not exist!')

## Get dates of radar files from the file name.
#radar_times = []
#for file_name in radar_files:
 #   where_x = file_name.find(x_compass)
  #  radar_times.append(
   #     datetime.datetime.strptime(file_name[where_x+3:where_x+15],
    #                               '%y%m%d%H%M%S'))"""

In [None]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
#                    walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster = PBSCluster(processes=36, cores=36, walltime='05:00:00',
                     scheduler_file='/home/zsherman/scheduler.json')
cluster.scale(10)         # Ask for ten workers
client = Client(cluster)  # Connect this local process to remote workers

In [None]:
cluster

In [None]:
client

In [None]:
def com():
    os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
    from mpl_toolkits.basemap import Basemap
    return os.environ['PROJ_LIB']
client.run(com)

In [None]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_vad(
    x, vel_field=vel_field, z_want=z_want, save_name=save_name,
    image_directory=image_directory, bad_directory=bad_directory,
    overwrite=overwrite)
futures = the_bag.map(the_function)

In [None]:
#start computation in the background
futures.compute()
client.shutdown()