# Generate data for clustering models

In [1]:
import numpy as np
import pyart
import os
#from sklearn.mixture import GaussianMixture
import pickle
#from netCDF4 import num2date, date2num
#import math
from multiprocessing import Pool
from multiprocessing import cpu_count
import pandas as pd


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119



In [6]:
def extract_vol(myradar,field):
    
    if filt:
        gatefilter = give_gatefilter(myradar)
        ex_gates = gatefilter._gate_excluded
        valid_returns = myradar.fields[field]['data'][:][~ex_gates].flatten()
    else:
        valid_returns = myradar.fields[field]['data'][:].flatten()
    return valid_returns.data

def give_gatefilter(myradar):

    try:
        myradar.check_field_exists('SNR')
    except:
        myradar.add_field('SNR',
                          pyart.retrieve.calculate_snr_from_reflectivity(myradar,
                                                                         refl_field='DBZH',
                                                                         snr_field=None,
                                                                         toa=25000.0))

    # Set gatefilters
    gatefilter = pyart.correct.GateFilter(myradar)
    if VRADH_inside is not None:
        gatefilter.exclude_inside('VRADH',VRADH_inside[0],VRADH_inside[1])
    if VRADH_outside is not None:
        gatefilter.exclude_outside('VRADH',VRADH_outside[0],VRADH_outside[1])
    if snr_cutoff is not None:
        gatefilter.exclude_below('SNR',snr_cutoff)
    if depseck_size is not None:
        gatefilter = pyart.correct.despeckle.despeckle_field(myradar,
                                                     'VRADH',
                                                     gatefilter=gatefilter,
                                                     size = depseck_size)
    return gatefilter

def extract_field_dict(vol_no): 
    try:
        myradar = pyart.aux_io.read_odim_h5('/'.join([dirr,all_files[vol_no]]), file_field_names=True)
    except:
        print('Skipped ' + all_files[vol_no] + ', error opening file')
        return
    
    flattened_data = {}
    for field in fields_to_extract:
        flattened_data[field] = []
        flattened_data[field].extend(extract_vol(myradar,field))
    
    if retrieve_height:
        if myradar.scan_type == 'rhi':
            x, y, z = myradar.get_gate_x_y_z(0, edges=False)
            flattened_data['height'] = z
            
    if retrieve_time:
        flattened_data['height']= np.full_like(flattened_data[0],myradar.time['data'][0])
        
    
    return pd.DataFrame(flattened_data)

def handle_output(result):
    if result is None:
        return
    else:
        store = pd.HDFStore(outloc+eventname+'.h5')
        #print(result)
        store.append(eventname, result, format='t',  data_columns=True)
        store.close()


In [7]:
test = True

fields_to_extract = ['DBZH','ZDR','RHOHV','WRADH','KDP']

filt = True

#Exclude values above this SNR value
snr_cutoff = 2
#Exclude values inside these values
VRADH_inside = None #[-0.5,0.5]
#Exclude values outside these values
VRADH_outside = None #[-10,10]
#Apply despeckle filter on VRADH with this minimum no of pixels
depseck_size = None #10

outloc = './training_data/'

RHI = True

retrieve_height = True
retrieve_time = False

In [4]:
if __name__ == '__main__':
    
    num_processes = cpu_count()
    
    eventname = 'Sedgerly_5th'
    dirr = './Cluster_analysis/raw_data/Sedgerly/h5_radar_5th/'

    all_files = os.listdir(dirr)
    all_files.sort()
    
    print('Begin processing ' + str(len(all_files)) + ' files.')
    pool = Pool(num_processes)
    for i in range(len(all_files)):
        pool.apply_async(extract_field_dict, (i, ), callback=handle_output)
    pool.close()
    pool.join()

    eventname = 'Sedgerly_6th'
    dirr = './Cluster_analysis/raw_data/Sedgerly/h5_radar_6th/'
    
    all_files = os.listdir(dirr)
    all_files.sort()
    
    print('Begin processing ' + str(len(all_files)) + ' files.')
    pool = Pool(num_processes)
    for i in range(len(all_files)):
        pool.apply_async(extract_field_dict, (i, ), callback=handle_output)
    pool.close()
    pool.join()
    
    eventname = 'Mt_Bolton'
    dirr = './Cluster_analysis/raw_data/Mt Bolton/radar_hdf/'
    
    all_files = os.listdir(dirr)
    all_files.sort()
    
    print('Begin processing ' + str(len(all_files)) + ' files.')
    pool = Pool(num_processes)
    for i in range(len(all_files)):
        pool.apply_async(extract_field_dict, (i, ), callback=handle_output)
    pool.close()
    pool.join()
    
    eventname = 'Dereel'
    dirr = './Cluster_analysis/raw_data/Dereel/Other RHIS/'
    
    all_files = os.listdir(dirr)
    all_files.sort()
    
    print('Begin processing ' + str(len(all_files)) + ' files.')
    pool = Pool(num_processes)
    for i in range(len(all_files)):
        pool.apply_async(extract_field_dict, (i, ), callback=handle_output)
    pool.close()
    pool.join()
    
    eventname = 'AspeyWest'
    dirr = './Cluster_analysis/raw_data/AspeyBurn/Odim/'
    
    all_files = os.listdir(dirr)
    all_files.sort()
    
    print('Begin processing ' + str(len(all_files)) + ' files.')
    pool = Pool(num_processes)
    for i in range(len(all_files)):
        pool.apply_async(extract_field_dict, (i, ), callback=handle_output)
    pool.close()
    pool.join()
    

Begin processing 156 files.
Skipped 99_20161205_002528.h5, error opening file
Skipped 99_20161205_002527.h5, error opening file
Skipped 99_20161205_002530.h5, error opening file
Skipped 99_20161205_002533.h5, error opening file
Skipped 99_20161205_002529.h5, error opening file
Skipped 99_20161205_002532.h5, error opening file
Skipped 99_20161205_002526.h5, error opening file
Skipped 99_20161205_002531.h5, error opening file
Skipped 99_20161205_002535.h5, error opening file
Skipped 99_20161205_002534.h5, error opening file
Skipped 99_20161205_002539.h5, error opening file
Skipped 99_20161205_002536.h5, error opening file
Skipped 99_20161205_002537.h5, error opening file
Skipped 99_20161205_002540.h5, error opening file
Skipped 99_20161205_002541.h5, error opening file
Skipped 99_20161205_002542.h5, error opening file
Skipped 99_20161205_002543.h5, error opening file
Skipped 99_20161205_002544.h5, error opening file
Skipped 99_20161205_002546.h5, error opening file
Skipped 99_20161205_00

In [10]:
if __name__ == '__main__':
    
    num_processes = cpu_count()

    eventname = 'Mt_Bolton_height'
    dirr = './raw_data/Mt Bolton/radar_hdf/'
    
    all_files = os.listdir(dirr)
    all_files.sort()
    
    if test:
        all_files = [all_files[187],all_files[187]]
    
    print('Begin processing ' + str(len(all_files)) + ' files.')
    pool = Pool(num_processes)
    for i in range(len(all_files)):
        pool.apply_async(extract_field_dict, (i, ), callback=handle_output)
    pool.close()
    pool.join()
    

Begin processing 2 files.


In [None]:
all_files