In [1]:
import os
import glob
import uuid
import datetime
import traceback

from itertools import product

import netCDF4
import pyreclass
import numpy as np
import xarray as xr
import pandas as pd

import dask.bag as db
from dask.diagnostics import ProgressBar

In [2]:
def mkdir(mydir):
    if os.path.exists(mydir):
        return None
    
    try:
        os.mkdir(mydir)
    except FileExistsError:
        return None
    
    return None        

In [3]:
def get_file_list(input_dir):
    '''
    Generate file list.
    
    Parameter:
    ===========
    input_dir: str
        Path to input data directory
        
    Returns:
    ========
    flist: list
        Input file list.
    '''
    flist = sorted(glob.glob(os.path.join(input_dir, '*.nc')))
    if len(flist) == 144:
        return flist
    
    flist = [None] * 144
    for cnt, (h, m) in enumerate(product(range(0, 24), range(0, 6))):
        infiles = glob.glob(os.path.join(input_dir, f'*{h:02}{m}*.nc'))
        if len(infiles) == 0:
            continue

        infile = infiles[0]
        if os.path.isfile(infile):
            flist[cnt] = infile

    return flist

In [4]:
def update_metadata(gnrl_meta):
    '''
    Correct general metadata dictionnary.
    
    Parameters:
    ===========
    gnrl_meta: dict
        General metadata dictionnary.
        
    Returns:
    ========
    gnrl_meta: dict
        Updated general metadata dictionnary.
    '''
    meta = {"title": "Convective/stratiform radar echo classification",
            "summary": "Daily timeseries of convective/stratiform radar echo classification using Raut et al. (2020) algorithm for CPOL radar (Darwin, Australia).",
            "source": "radar",            
            "Conventions": "CF-1.6, ACDD-1.3",
            "history": f"created by Valentin Louf on gadi.nci.org.au at {datetime.datetime.now().isoformat()}",
            "license": "Freely Distributed",
            "product_version": 'v2020.02',
            "processing_level": 'c1',
            "institution": 'Bureau of Meteorology',
            "project": "CPOL",
            "instrument": "radar",
            "platform": "fixed",
            "id": str(uuid.uuid4()),
            "date_created": datetime.datetime.now().isoformat(),            
            "references": "doi:10.1175/JTECH-D-18-0007.1",
            "standard_name_vocabulary":  'CF Standard Name Table v67',
            "geospatial_lat_min": gnrl_meta["geospatial_lat_min"],
            "geospatial_lat_max": gnrl_meta["geospatial_lat_max"],
            "geospatial_lon_min": gnrl_meta["geospatial_lon_min"],
            "geospatial_lon_max": gnrl_meta["geospatial_lon_max"],
            "geospatial_lat_units": gnrl_meta["geospatial_lat_units"],
            "geospatial_lon_units": gnrl_meta["geospatial_lon_units"],
            "geospatial_vertical_min": "2500",
            "geospatial_vertical_max": "2500",
            "geospatial_vertical_positive": "up",
            "geospatial_vertical_units": 'm',
            "geospatial_bounds": "POLYGON((129.70320575394314 -13.552906009907133,129.70320575394314 -10.9417780423924,132.3856873419999 -10.9417780423924,132.3856873419999 -13.552906009907133,129.70320575394314 -13.552906009907133))",
            "time_coverage_duration": "P1D",
            "time_coverage_resolution": "PT10M",
            "creator_email": "valentin.louf@bom.gov.au",
            "creator_name": "Valentin Louf",
            "origin_latitude": "-12.249",
            "origin_longitude": "131.044",
            "origin_altitude": "50",
            "country": "Australia",
            'creator_url': 'github.com/vlouf',
            "state": "NT",
            "site_name": "Gunn Pt",
            "naming_authority": "au.org.nci"}
    
    return meta

In [5]:
def get_label(infile):
    dset = xr.open_dataset(infile)
    meta = dset.attrs
    refl = np.squeeze(dset.reflectivity_gridded_dBZ.sel({'z': 2500}).values)
    label = pyreclass.getWTClass(refl, 2.5)
    x = dset.x
    y = dset.y
    lat = dset.point_latitude.sel({'z': 2500})
    lon = dset.point_longitude.sel({'z': 2500})
        
    return x, y, lon, lat, label, meta

In [6]:
def make_dailys(date):
    INDIR = '/g/data/hj10/admin/cpol_level_1b/v2019/gridded/grid_150km_2500m/'
    indir = os.path.join(INDIR, str(date.year), date.strftime('%Y%m%d'))
    trange = pd.date_range(date, date + datetime.timedelta(days=1), 145)[:-1]
    if not os.path.exists(indir):
        return None
    flist = get_file_list(indir)
    IS_FILE = np.zeros((144), dtype=np.int32) + 1
    MOMENT_TOTAL = np.zeros((144, 117, 117), dtype=np.int32)

    for cnt, infile in enumerate(flist):
        if infile is None:
            MOMENT_TOTAL[cnt, :, :] = 0
            IS_FILE[cnt] = 0
            continue
        try:
            x, y, lon, lat, label, meta = get_label(infile)
            MOMENT_TOTAL[cnt, :, :] = label
        except Exception:
            print(f"Error with {infile}")
            traceback.print_exc()
            IS_FILE[cnt] = 0

    meta = update_metadata(meta)
    dset = xr.Dataset({'x': x,
                       'y': y,
                       'time': trange,
                       'latitude': lat,
                       'longitude': lon,
                       'isfile': (('time'), IS_FILE),
                       'ptype_classification': (('time', 'y', 'x'), MOMENT_TOTAL)})
    dset.attrs = meta
    dset.ptype_classification.attrs['units'] = '1'
    dset.ptype_classification.attrs['long_name'] = 'precipitation_type_classification'
    dset.ptype_classification.attrs['description'] = '0. N/A, 1. stratiform, 2. intense convective, 3. moderate+transitional convective'
    dset.ptype_classification.attrs['reference'] = "doi:10.1109/TGRS.2020.2965649"
    dset.latitude.attrs['standard_name'] = 'latitude'
    dset.longitude.attrs['standard_name'] = 'longitude'
    dset.isfile.attrs['units'] = '1'
    dset.isfile.attrs['long_name'] = 'qc_measurement_exists'
    dset.time.attrs['standard_name'] = 'time'    
    dset.time.attrs['long_name'] = 'time'    
    dset.attrs['time_coverage_start'] = trange[0].isoformat()
    dset.attrs['time_coverage_end'] = trange[-1].isoformat()
    
    try:
        dset = dset.drop('z')
    except Exception:
        pass

    return dset

In [7]:
def buffer(date):
    OUTPATH = '/g/data/hj10/admin/cpol_level_2'
    outfilename = os.path.join(OUTPATH, f"twp1440cpol.ptypeclass.c1.{date.strftime('%Y%m%d')}.nc")
    if os.path.exists(outfilename):
        return None
    
    try:
        dset = make_dailys(date)
    except Exception:
        traceback.print_exc()
        return None
    
    if dset is None:
        return None
    
    dset.to_netcdf(outfilename, encoding={'ptype_classification': {'zlib': True}})
    del dset
    return None    

In [8]:
datelst = pd.date_range('19981206', '20170503')

In [None]:
bag = db.from_sequence(datelst).map(buffer)
with ProgressBar():
    bag.compute()

[########################                ] | 61% Completed |  1hr 41min 48.3s

In [10]:
2+2

4