In [1]:
import numpy as np
import glob
# from distributed import Client, LocalCluster
import xarray as xr
import cartopy.crs as ccrs
import concurrent.futures
import matplotlib.pyplot as plt
import parallel_gridding as pg


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119





In [2]:
# cluster = LocalCluster()
# client = Client(cluster)
# client

In [3]:
# nexrad_dir = "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/"
# data_files = pg.get_nexrad_data_files("20220330220000", "20220331040000")

In [4]:
# # Create a list of tasks to process the data files
# tasks = [client.submit(pg.process_file, nexrad_dir, f) for f in data_files[:2]]

# # Gather the results of the tasks
# results = client.gather(tasks)

In [5]:
# !ls /depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/KGWX/

In [None]:
%%timeit
nexrad_dir = "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/"
data_files = pg.get_nexrad_data_files("20220330220000", "20220331040000")

# Create a thread pool executor with 100 threads
with concurrent.futures.ThreadPoolExecutor(max_workers=500) as executor:
    # Submit the tasks to the executor
    results = [executor.submit(pg.process_file, nexrad_dir, f) for f in data_files]
    # Wait for all tasks to complete
    concurrent.futures.wait(results)


Available files:  54
Reading file: KGWX20220330_220424_V06.nc
Reading file: KGWX20220330_221124_V06.nc
Reading file: KGWX20220330_221824_V06.nc
Reading file: KGWX20220330_222523_V06.nc


In [148]:
!sbatch sbatch_parallel_gridding.sbatch

Submitted batch job 25603080


In [150]:
job = 25603080

In [149]:
!squeue -A dawson29

JOBID        USER      ACCOUNT      NAME             NODES   CPUS  TIME_LIMIT ST TIME
25514127     syed44    dawson29     OnDemand/Noteboo     1     16  5-00:00:00  R 3-07:41:23
25598287     jiang703  dawson29     100mSFreer19         7    400  3-00:00:00  R 5:28:02
25603080     syed44    dawson29     sbatch_parallel_     4    128  1-00:00:00  R 0:09


In [153]:
!tail parallel_gridding.out



In [48]:
!ls -lt "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/KGWX/"

total 54
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:38 KGWX20220330_232552_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:38 KGWX20220330_233952_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:38 KGWX20220330_233252_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_234639_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_225209_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_231240_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_235324_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_223845_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_222523_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220331_021011_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_230555_V06.nc
drwxrws---+ 2 syed44 dawson29-data 4096 Jun  2 12:37 KGWX20220330_221824_V06.nc
drwxrws---+ 2 syed

In [54]:
import pyart
import os
import glob

In [62]:
data_path = "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/KGWX"

In [63]:
files = sorted(glob.glob(os.path.join(data_path, "*.nc")))
len(files)

54

In [8]:
import warnings
warnings.filterwarnings("ignore")
import pyart
import numpy as np
from matplotlib import pyplot as plt
import glob
from datetime import datetime
import time
import os
import shutil
import multiprocessing as mp


def _drop_fields(radar):
    fields_to_drop = ["PURPLE_HAZE"]
    for field in fields_to_drop:
        if field in radar.fields:
            del radar.fields[field]

    return radar


#function to dealiase the Doppler velocity
def dealiase(radar, vel_name='velocity', gatefilter=None, method="unwrap"):
    '''
    Dealias Doppler velocities using Py-ART.
    method : str
        Method to use for the dealiasing. Can be 'unwrap' or 'region'.
    '''
    # Create a GateFilter if one was not provided
    if gatefilter is None:
        gatefilter = pyart.correct.GateFilter(radar)
        gatefilter.include_above("REF", 0)
        gatefilter.exclude_masked('REF')
        gatefilter.exclude_invalid('REF')
        gatefilter.exclude_transition()
    # Dealias Doppler velocities using the selected method
    if method == "unwrap":
        corr_vel = pyart.correct.dealias_unwrap_phase(radar, vel_field=vel_name,
                                                      keep_original=False, gatefilter=gatefilter)
    elif method == "region":
        corr_vel = pyart.correct.dealias_region_based(radar, vel_field=vel_name,
                                                      keep_original=False, gatefilter=gatefilter)
    # Add the dealiased Doppler velocities to the radar object
    radar.add_field(vel_name, corr_vel, replace_existing=True)
    return radar


def filter_data(radar, refl_field, vel_field, dealias_method="region"):
    '''Remove noise based on velocity texture,snr, and rhohv, and mask all the fields'''
    # Drop some fields
    radar = _drop_fields(radar)
    # Align radar coords
    radar.scan_type = b'ppi'
    # Dealias
    radar = dealiase(radar, vel_name=vel_field, gatefilter=None, method=dealias_method)
    return radar


def get_nexrad_data_files(start_time: str, end_time: str) -> list:
    '''start_time: str ('YYYYMMDDHHMMSS')
       end_time: str ('YYYYMMDDHHMMSS')'''
    
    start_datetime = datetime.strptime(start_time, '%Y%m%d%H%M%S')
    end_datetime = datetime.strptime(end_time, '%Y%m%d%H%M%S')
    
    nexrad_dir = "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/NEXRAD/"
    data_files = []
    for iop in ["IOP2"]:
        for radar_name in ['KGWX']:
            data_files.extend(sorted(glob.glob(f"{nexrad_dir}{iop}/{radar_name}/*V06.nc")))
    
    data_files_filtered = []
    for file in data_files:
#         print(file.split('/')[-1].split("KGWX")[1].split("_V")[0])
        file_datetime_str = file.split('/')[-1].split("KGWX")[1].split("_V")[0]
        file_datetime = datetime.strptime(file_datetime_str, '%Y%m%d_%H%M%S')
        if start_datetime <= file_datetime < end_datetime:
            data_files_filtered.append(file)
    
    print("Available files: ", len(data_files_filtered))
#     for file in data_files_filtered:
#         print(file.split("/")[-1])
        
    return data_files_filtered


# Define a function that takes a file name and writes the GRID file
def process_file(outdir, rfile):
    print(f'Reading file: {os.path.basename(rfile)}')
    radar = pyart.io.read(rfile)
    radar = filter_data(radar, "DBZ", "VEL")
    max_rng = 200.0*1e3 # np.ceil(radar.range['data'].max())
    grid = pyart.map.grid_from_radars(radar,(61,801,801),
                       ((0.,15e3),(-max_rng, max_rng),(-max_rng, max_rng)), 
                                       weighting_function='Barnes2',
                                      fields=['REF', 'VEL', 'ZDR', "RHO"])

    print(f'Deleting Radar Object: {os.path.basename(rfile)}')
    del radar
    
    grid_file_path = os.path.join(outdir,"IOP2/KGWX")
    # Remove the existing directory if it already exists
    if os.path.exists(grid_file_path):
        shutil.rmtree(grid_file_path)

    os.makedirs(grid_file_path)
    print(f"Saving in:{grid_file_path} as {os.path.basename(rfile)}\n")
    # write grid
    pyart.io.write_grid(filename=os.path.join(grid_file_path, os.path.basename(rfile)), grid=grid)


def process_file_wrapper(args):
    return process_file(*args)


outdir = "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/"
data_files = get_nexrad_data_files("20220330220000", "20220331040000")

# Create a pool of worker processes with 100 CPUs
#pool = mp.Pool(processes=100)

# Create a Pool with 4 worker processes
with Pool() as pool:
    results = pool.map(process_file_wrapper, [(outdir, f) for f in data_files])
    
    
# Map the data files to the process_file function to process them in parallel
#results = pool.map(process_file_wrapper, [(outdir, f) for f in data_files])

# Close the pool of worker processes
pool.close()
pool.join()

In [9]:
outdir = "/depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/"
data_files = get_nexrad_data_files("20220330220000", "20220331040000")
process_file(outdir = outdir, rfile=data_files[0])

Available files:  54
Reading file: KGWX20220330_220424_V06.nc
Deleting Radar Object: KGWX20220330_220424_V06.nc
Saving in:/depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/KGWX as KGWX20220330_220424_V06.nc

/depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/KGWX/KGWX20220330_220424_V06.nc


In [178]:
# !sbatch sbatch_parallel_gridding.sbatch

In [49]:
import subprocess

sbatch_command = "sbatch sbatch_parallel_gridding.sbatch"
output = subprocess.check_output(sbatch_command, shell=True)
job_id = output.decode('utf-8').strip().split()[-1]

print("Submitted job with ID:", job_id)

Submitted job with ID: 25701334


In [19]:
!squeue -A dawson29

JOBID        USER      ACCOUNT      NAME             NODES   CPUS  TIME_LIMIT ST TIME
25630030     syed44    dawson29     OnDemand/Noteboo     1     16  5-00:00:00  R 3-04:30:11
25707890     jiang703  dawson29     OnDemand/Noteboo     1     12     6:00:00  R 1:17:51
25706197     kaxon     dawson29     OnDemand/Desktop     1      4  2-12:00:00  R 2:26:02
25700093     jiang703  dawson29     R50r15kmCdFreeC1    15    400  3-00:00:00  R 15:57:22


In [62]:
!squeue -j $job_id

JOBID        USER      ACCOUNT      NAME             NODES   CPUS  TIME_LIMIT ST TIME
25701334     syed44    dawson29     sbatch_parallel_     2    256  1-00:00:00 PD 0:00


In [63]:
!tail parallel_gridding.out

Saving in: /depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/UAH as RAW_NA_000_100_20220331010202.nc

Reading file: RAW_NA_000_100_20220330195757.nc
Deleting Radar Object: RAW_NA_000_100_20220330195757.nc
Saving in: /depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/UAH as RAW_NA_000_100_20220330195757.nc

Reading file: RAW_NA_000_100_20220331011803.nc
Deleting Radar Object: RAW_NA_000_100_20220331011803.nc
Saving in: /depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/UAH as RAW_NA_000_100_20220331011803.nc



In [342]:
# !scancel $job_id

In [361]:
!ls -lth /depot/dawson29/data/Projects/PERiLS/obsdata/2022/GRID/IOP2/KGWX/

total 29G
-rw-rw---- 1 syed44 dawson29-data 301M Jun  5 12:07 KGWX20220330_232552_V06.nc
-rw-rw---- 1 syed44 dawson29-data 301M Jun  5 12:07 KGWX20220330_233952_V06.nc
-rw-rw---- 1 syed44 dawson29-data 300M Jun  5 12:07 KGWX20220330_234639_V06.nc
-rw-rw---- 1 syed44 dawson29-data 301M Jun  5 12:07 KGWX20220330_233252_V06.nc
-rw-rw---- 1 syed44 dawson29-data 290M Jun  5 12:07 KGWX20220330_225909_V06.nc
-rw-rw---- 1 syed44 dawson29-data 279M Jun  5 12:07 KGWX20220330_223845_V06.nc
-rw-rw---- 1 syed44 dawson29-data 298M Jun  5 12:07 KGWX20220330_231240_V06.nc
-rw-rw---- 1 syed44 dawson29-data 285M Jun  5 12:07 KGWX20220330_225209_V06.nc
-rw-rw---- 1 syed44 dawson29-data 296M Jun  5 12:07 KGWX20220330_230555_V06.nc
-rw-rw---- 1 syed44 dawson29-data 296M Jun  5 12:07 KGWX20220330_231927_V06.nc
-rw-rw---- 1 syed44 dawson29-data 245M Jun  5 12:07 KGWX20220331_021712_V06.nc
-rw-rw---- 1 syed44 dawson29-data 296M Jun  5 12:07 KGWX20220330_235324_V06.nc
-rw-rw---- 1 syed44 dawson29-

In [363]:
!scontrol show job $job_id

JobId=25632270 JobName=sbatch_parallel_gridding.sbatch
   UserId=syed44(871648) GroupId=student(132) MCS_label=N/A
   Priority=126296 Nice=0 Account=dawson29 QOS=normal
   JobState=COMPLETED Reason=None Dependency=(null)
   Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
   RunTime=00:12:13 TimeLimit=1-00:00:00 TimeMin=N/A
   SubmitTime=2023-06-05T11:55:32 EligibleTime=2023-06-05T11:55:32
   AccrueTime=2023-06-05T11:55:32
   StartTime=2023-06-05T11:55:32 EndTime=2023-06-05T12:07:45 Deadline=N/A
   SuspendTime=None SecsPreSuspend=0 LastSchedEval=2023-06-05T11:55:32 Scheduler=Main
   Partition=bell-a AllocNode:Sid=bell-a140:70445
   ReqNodeList=(null) ExcNodeList=(null)
   NodeList=bell-a[300-301]
   BatchHost=bell-a300
   NumNodes=2 NumCPUs=256 NumTasks=2 CPUs/Task=128 ReqB:S:C:T=0:0:*:*
   TRES=cpu=256,mem=498G,node=2,billing=256
   Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
   MinCPUsNode=128 MinMemoryCPU=1992M MinTmpDiskNode=0
   Features=(null) DelayBo

In [9]:
def linspace_range(start, stop, step):
    num = ((stop - start) / step + 1)
    return num

In [48]:
max_rng = 88.25*1e3
print(linspace_range(-max_rng, max_rng, 500))

353*500/2

354.0


88250.0