In [67]:
import math
import time
import os
from datetime import datetime

import h5py
import numpy as np
import pandas as pd
import wradlib
import xarray as xr
import parquet
from numba import jit
from threading import Thread
import warnings

  result_data = func(*input_data)
  result_data = func(*input_data)


In [68]:
warnings.filterwarnings("ignore")

Converting NLHRW_pvol_20230111T0510_6356.h5
Converting NLHRW_pvol_20230109T1655_6356.h5
Converting NLHRW_pvol_20230110T0525_6356.h5


In [64]:
class BirdCloud:

    def __init__(self):
        """
        Prepares BirdCloud object

        The object contains radar, product and projection metadata and a potentially range limited point cloud.
        """
        self.source = None
        self.radar = dict()
        self.product = dict()
        self.scans = dict()
        self.pointcloud = None
        self.range_limit = None
        self.projection = None
        self._elevations = []

    def from_odim_file(self, filepath, range_limit=None, dr_thresh=-12):
        """
        Builds point cloud from ODIM formatted polar volume

        :param filepath: path to the ODIM formatted HDF5 file
        :param range_limit: None or iterable containing both minimum and maximum range of the point cloud from the radar
            site.
        :param dr_thresh: Threshold of depolarization ratio to label biology and meteorology. Values above this
            threshold will be labeled as biology.
        """
        f = h5py.File(filepath, 'r')
        self.source = filepath
        self.range_limit = range_limit

        self.parse_odim_metadata(f)
        self.extract_odim_scans(f)
        self.calculate_additional_metrics()
        self.label_biology_using_depolarization_ratio(thresh=dr_thresh)
        self.flatten_scans_to_pointcloud()
        self.drop_na_rows()
        self.set_column_order()
        self.remove_columns()
        #self.to_parquet(r"C:\Users\Jeroen\Documents\Studie\Master\Thesis\pointclouds\")

    def parse_odim_metadata(self, file):
        """
        Parses ODIM metadata from HDF5 file about radar itself and the provided radar products

        Volume scan start and end times are derived from start time of the 1st scan and end time of the 16th scan, as
        scans are numbered chronologically.
        :param file: ODIM formatted HDF5 file object
        """
        source = dict(pair.split(':') for pair in file['what'].attrs.get('source').decode('UTF-8').split(','))
        self.radar['name'] = source['PLC'].replace(" ", "") if source.get('PLC') is not None else source.get('WMO')
        self.radar['latitude'] = file['where'].attrs.get('lat')[0]
        self.radar['longitude'] = file['where'].attrs.get('lon')[0]
        self.radar['altitude'] = file['where'].attrs.get('height')[0]
        self.radar['polarization'] = self.radar_metadata[self.radar['name']]['polarization']

        time_start = datetime.strptime(file['dataset1']['what'].attrs.get('starttime').decode('UTF-8'), '%H%M%S').time()
        date_start = datetime.strptime(file['dataset1']['what'].attrs.get('startdate').decode('UTF-8'), '%Y%m%d').date()
        self.product['datetime_start'] = datetime.combine(date_start, time_start)
        time_end = datetime.strptime(file['dataset16']['what'].attrs.get('endtime').decode('UTF-8'), '%H%M%S').time()
        date_end = datetime.strptime(file['dataset16']['what'].attrs.get('enddate').decode('UTF-8'), '%Y%m%d').date()
        self.product['datetime_end'] = datetime.combine(date_end, time_end)

    def extract_odim_scans(self, file):
        """
        Iterates over all scans and corresponding datasets in the ODIM formatted HDF5 files and stores all scans in
        self.scans dictionary as Xarray Datasets, accessible by elevation keys.

        :param file: ODIM formatted HDF5 file object
        """
        for group in file:
            if not file[group].name.startswith('dataset', 1):
                continue

            if file[group].name in self.excluded_scans:
                continue

            elevation = round(file[group]['where'].attrs.get('elangle')[0], 1)
            n_range_bins = file[group]['where'].attrs.get('nbins')[0]
            n_azim_bins = file[group]['where'].attrs.get('nrays')[0]
            bin_range = file[group]['where'].attrs.get('rscale')[0] / 1000
            site_coords = [self.radar['longitude'], self.radar['latitude'], self.radar['altitude'] / 1000]

            bin_range_min, bin_range_max = self.calculate_bin_range_limits(self.range_limit, bin_range, n_range_bins)

            x, y, z, ranges, azimuths = self.calculate_xyz(site_coords, elevation, n_azim_bins,
                                                           bin_range, bin_range_min, bin_range_max)

            datasets = {}

            for dataset in file[group]:
                try:
                    ds, quantity = self.parse_odim_dataset(file[group][dataset], bin_range_min, bin_range_max,
                                                           ranges, azimuths)

                    datasets[quantity] = ds

                except TypeError:
                    continue

            dataset = xr.Dataset(data_vars=datasets,
                                 coords={'azimuth': azimuths,
                                         'range': ranges,
                                         'x': (['azimuth', 'range'], x),
                                         'y': (['azimuth', 'range'], y),
                                         'z': (['azimuth', 'range'], z)},
                                 attrs={'elevation': elevation,
                                        'n_range_bins': n_range_bins,
                                        'n_azim_bins': n_azim_bins,
                                        'bin_range': bin_range})

            self.scans[str(elevation)] = dataset
            self._elevations.append(elevation)
    
    def parse_odim_dataset(self, dataset, bin_range_min, bin_range_max, ranges, azimuths):
        """
        Parses a single dataset within a scan by converting it using the calibration formula and storing it as an
        Xarray DataArray.

        All missing data is marked as np.nan.

        :param dataset: ODIM HDF5 file object with path to a radar dataset
        :param bin_range_min: the index of the first bin that falls within the instance range_limit
        :param bin_range_max: the index of the last bin that falls within the instance range_limit
        :param ranges: grid of range values corresponding to each of the cells in the dataset array
        :param azimuths: grid of azimuth values corresponding to each of the cells in the dataset array
        :return: converted ODIM dataset as Xarray DataArray and corresponding product (quantity) name
        """
        dataset_name = dataset.name.replace(dataset.parent.name + '/', '')

        if not dataset_name.startswith('data'):
            return None

        quantity = dataset['what'].attrs.get('quantity').decode('UTF-8')

        if quantity in self.excluded_datasets[self.radar['polarization']]:
            return None

        gain = dataset['what'].attrs.get('gain')[0]
        offset = dataset['what'].attrs.get('offset')[0]
        nodata = dataset['what'].attrs.get('nodata')[0]
        undetect = dataset['what'].attrs.get('undetect')[0]

        raw_data = dataset['data'][:, bin_range_min:bin_range_max]
        missing = np.logical_or(raw_data == nodata, raw_data == undetect)

        corrected_data = raw_data * gain + offset
        corrected_data[missing] = np.nan

        ds = xr.DataArray(corrected_data, coords=[azimuths, ranges], dims=['azimuth', 'range'])

        return ds, quantity

    def calculate_bin_range_limits(self, range_limit, bin_range, n_range_bins):
        """
        Calculates range of bins to select for all to fall within provided range_limit.

        If the lower limit is None, it will be converted to 0. If the upper limit is None, it will be converted to the
        maximum value of the range, i.e. n_range_bins.

        E.g. this function will return the 6th bin as bins_min if the minimum range limit is set to 5 (km) and the
            bin_range is 0.900: 5/0.900 = 5.5555 -> 6

        :param range_limit: iterable containing successively a minimum and maximum range
        :param bin_range: range covered by a single bin in the same units as range_limit
        :param n_range_bins: the number of range bins within a scan
        :return: indexes for the first (bins_min) and last (bins_max) bins that fall within the given range_limit
        """
        if range_limit is None:
            range_limit = [None, None]

        minimum = range_limit[0] if range_limit[0] is not None else 0
        maximum = range_limit[1] if range_limit[1] is not None else n_range_bins

        bins_min = math.ceil(minimum / bin_range)
        if bins_min > n_range_bins:
            raise ValueError('Minimum range set too high: no datapoints remaining.')

        bins_max = math.floor(maximum / bin_range)
        if bins_max > n_range_bins:
            bins_max = n_range_bins

        return bins_min, bins_max

    def calculate_xyz(self, sitecoords, elevation_angle, n_azim_bins, bin_range, bin_range_min, bin_range_max):
        """
        Calculates X, Y and Z coordinates for centers of all radar bins using wradlib and sets self.projection to the
        corresponding georeferencing information.

        :param sitecoords: iterable containing coordinates and altitude of radar site, in the order of: longitude,
            latitude, altitude
        :param elevation_angle: elevation angle of the radar scan
        :param n_azim_bins: number of azimuthal bins of the radar scan (usually 360)
        :param bin_range: range covered by every bin (usually in kilometers). Should be in the same units as the radar
            altitude
        :param bin_range_min: index of the first range bin to calculate X, Y and Z coordinates for
        :param bin_range_max: index of the last range bin to calculate X, Y and Z coordinates for
        :return: numpy arrays for the X, Y and Z coordinates and ranges and azimuths
        """
        if sitecoords is None:
            sitecoords = (0, 0)

        n_range_bins = bin_range_max - bin_range_min
        range_min = bin_range_min * bin_range
        range_max = bin_range_max * bin_range
        ranges = np.linspace(range_min, range_max, n_range_bins)
        azimuths = np.arange(0, n_azim_bins)

        polargrid = np.meshgrid(ranges, azimuths)

        xyz, self.projection = wradlib.georef.polar.spherical_to_xyz(polargrid[0], polargrid[1], elevation_angle,
                                                                     sitecoords, re=6378000, squeeze=True)

        return xyz[:, :, 0], xyz[:, :, 1], xyz[:, :, 2], ranges, azimuths

    def calculate_additional_metrics(self):
        """
        Triggers calculation of other metrics, such as ZDR calculation, textures etc.
        """
        self.calculate_differential_reflectivity()
        self.calculate_depolarization_ratio()

    def calculate_differential_reflectivity(self):
        """
        Calculates differential reflectivity or ZDR, defined as DBZH - DBZV (following Stepanian et al., 2016).
        """
        for elevation in self.scans:
            self.scans[elevation]['ZDR'] = self.scans[elevation]['DBZH'] - self.scans[elevation]['DBZV']

    def calculate_depolarization_ratio(self):
        """
        Calculated depolarization ratio following Kilambi et al. (2018).

        DR = 10 * log10((ZDR + 1 - 2 * ZDR^0.5 * RHOHV) / (ZDR + 1 + 2 * ZDR^0.5 * RHOHV))
        """
        for elevation in self.scans:
            self.scans[elevation]['DR'] = 10 * np.log10((self.scans[elevation]['ZDR'] + 1 - 2 * np.sqrt(self.scans[elevation]['ZDR']) * self.scans[elevation]['RHOHV']) /
                                                        (self.scans[elevation]['ZDR'] + 1 + 2 * np.sqrt(self.scans[elevation]['ZDR']) * self.scans[elevation]['RHOHV']))

    def label_biology_using_depolarization_ratio(self, thresh=-12):
        """
        Following Kilambi et al. (2018) and labelling as biology anything above a depolarization ratio of -12 seems to
        work fairly well, so we'll use that, but threshold can be overridden.
        """
        for elevation in self.scans:
            self.scans[elevation]['biology'] = (self.scans[elevation]['DR'] > thresh) * 1

    def flatten_scans_to_pointcloud(self):
        """
        The function that 'builds' the pointcloud by flattening the self.scans dictionary into a single Pandas dataframe
        self.pointcloud. This is a necessary step to e.g. save the pointcloud in a CSV file for viewing in CloudCompare.
        """
        for elevation in self.scans:
            dataframe = self.scans[elevation].to_dataframe()
            dataframe['elevation'] = elevation
            dataframe.reset_index(inplace=True)

            if isinstance(self.pointcloud, pd.DataFrame):
                self.pointcloud = pd.concat([self.pointcloud, dataframe])
            else:
                self.pointcloud = dataframe

    def drop_na_rows(self, subset=None):
        """
        Drops rows from the file where columns in subset containg NA/NaN values. By default this is done for rows that
        contain no values for DBZH and VRADH.

        :param subset: Iterable of column names that cannot have NA/NaN values in rows.
        """
        if subset is None:
            subset = ['DBZH', 'VRADH']

        self.pointcloud.dropna(subset=subset, inplace=True)

    def set_column_order(self):
        """
        Orders columns in order defined in self.column_order for both single-pol and dual-pol polarizations.
        """
        order = self.column_order[self.radar['polarization']]
        columns_unordered = list(self.pointcloud.columns)
        columns_ordered = [variable for variable in order if variable in columns_unordered]
        self.pointcloud = self.pointcloud[columns_ordered]
        self.pointcloud.set_index(['elevation', 'azimuth', 'range'], inplace=True)
        
    def remove_columns(self):
        newpc = self.pointcloud.copy()
        newpc = newpc[['x', 'y', 'z', 'DBZH', 'VRADH', 'biology']]
        self.pointcloud = newpc
        self.pointcloud.reset_index(drop=True, inplace=True)

    def to_csv(self, file_path, compression=None, float_format=None):
        """
        Exports the point cloud to a CSV file.
        :param file_path: path to CSV file. If the file does not exist yet, it will be created.
        """
        self.pointcloud.to_csv(file_path, na_rep="NaN", quotechar='"', index=False, compression=compression,
                               float_format=float_format)
        
    def to_parquet(self, file_path):
        """
        Exports the point cloud to a parquet file.
        :param file_path: path to parquet file. If the file does not exist yet, it will be created.
        """
        
        self.pointcloud.to_parquet(file_path)

    @property
    def elevations(self):
        return sorted(self._elevations)

    @elevations.setter
    def elevations(self, value):
        self._elevations.append(value)

    radar_metadata = {
        'DeBilt': {'altitude': 44, 'polarization': 'SinglePol'},
        'DenHelder': {'altitude': 51, 'polarization': 'DualPol'},
        'Herwijnen': {'altitude': 27.7, 'polarization': 'DualPol'}
    }

    excluded_scans = {'/scan1', '/scan7', '/scan16', '/dataset1', '/dataset7', '/dataset16'}

    available_datasets = {
        'SinglePol': {
            'uZ': {'description': 'Uncorrected reflectivity', 'ODIM': 'TH'},
            'V': {'description': 'Radial velocity', 'ODIM': 'VRADH'},
            'Z': {'description': 'Reflectivity (corrected)', 'ODIM': 'DBZH'},
            'W': {'description': 'Spectral width of radial velocity', 'ODIM': 'WRADH'},
            'TX_power': {'description': 'Total reflectivity factor', 'ODIM': None}
        },
        'DualPol': {
            'CCOR': {'description': 'Clutter correction (horizontally polarized)', 'ODIM': 'CCORH'},
            'CCORv': {'description': 'Clutter correction (vertically polarized)', 'ODIM': 'CCORV'},
            'CPA': {'description': 'Clutter phase alignment (horizontally polarized)', 'ODIM': 'CPAH'},
            'CPAv': {'description': 'Clutter phase alignment (vertically polarized)', 'ODIM': 'CPAV'},
            'KDP': {'description': 'Specific differential phase', 'ODIM': 'KDP'},
            'PhiDP': {'description': 'Differential phase', 'ODIM': 'PHIDP'},
            'RhoHV': {'description': 'Correlation between Z(h) and Zv', 'ODIM': 'RHOHV'},
            'SQI': {'description': 'Signal quality index (horizontally polarized)', 'ODIM': 'SQIH'},
            'SQIv': {'description': 'Signal quality index (vertically polarized)', 'ODIM': 'SQIV'},
            'TX_power': {'description': 'Total reflectivity factor', 'ODIM': None},
            'uPhiDP': {'description': 'Unsmoothed differential phase', 'ODIM': 'PHIDPU'},
            'uZ': {'description': 'Uncorrected reflectivity (horizontally polarized)', 'ODIM': 'TH'},
            'uZv': {'description': 'Uncorrected reflectivity (vertically polarized)', 'ODIM': 'TV'},
            'V': {'description': 'Radial velocity (horizontally polarized)', 'ODIM': 'VRADH'},
            'Vv': {'description': 'Radial velocity (vertically polarized)', 'ODIM': 'VRADV'},
            'W': {'description': 'Spectral width of radial velocity (horizontally polarized)', 'ODIM': 'WRADH'},
            'Wv': {'description': 'Spectral width of radial velocity (vertically polarized)', 'ODIM': 'WRADV'},
            'Z': {'description': 'Reflectivity (corrected, horizontally polarized)', 'ODIM': 'DBZH'},
            'Zv': {'description': 'Reflectivity (corrected, vertically polarized)', 'ODIM': 'DBZV'}
        }
    }

    excluded_datasets = {
        'SinglePol': {'CCOR', 'CCORv', 'CPA', 'CPAv', 'SQI', 'SQIv', 'TX_power',  # KNMI HDF5
                      'CCORH', 'CCORV', 'CPAH', 'CPAV', 'SQIH', 'SQIV'},  # ODIM HDF5
        'DualPol': {'CCOR', 'CCORv', 'CPA', 'CPAv', 'SQI', 'SQIv', 'TX_power',  # KNMI HDF5
                    'CCORH', 'CCORV', 'CPAH', 'CPAV', 'SQIH', 'SQIV'}  # ODIM HDF5
    }

    column_order = {
        'SinglePol': ['elevation', 'azimuth', 'range', 'x', 'y', 'z', 'DBZH', 'TH', 'VRADH', 'WRADH', 'TX_power'],
        'DualPol': ['elevation', 'azimuth', 'range', 'x', 'y', 'z', 'DBZH', 'DBZV', 'TH', 'TV', 'VRADH', 'VRADV',
                    'WRADH', 'WRADV', 'PHIDP', 'PHIDPU', 'RHOHV', 'KDP', 'ZDR', 'DR', 'CCORH', 'CCORV', 'CPAH', 'CPAV',
                    'SQIH', 'SQIV', 'TX_power', 'biology']
    }

In [65]:
def convert_folder(folder_name):
    filenames = os.listdir(f"../data/{folder_name}")
    for file in filenames:
        print(f"Converting {file}")
        pc = BirdCloud()
        pc.from_odim_file(f"../data/{folder_name}/{file}")
        new_name = file.split(".")[0]
        new_name = new_name + ".parquet"
        pc.to_parquet(f"../pointclouds/{new_name}")

In [66]:
ps = []
for i in ['09', '10', '11']:
    p = Thread(target=convert_folder, args=(i,))
    ps.append(p)
    p.start()
    p.join()

Converting NLHRW_pvol_20230109T1130_6356.h5
Converting NLHRW_pvol_20230111T0000_6356.h5
Converting NLHRW_pvol_20230110T0000_6356.h5


  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)


Converting NLHRW_pvol_20230110T0005_6356.h5
Converting NLHRW_pvol_20230109T1135_6356.h5
Converting NLHRW_pvol_20230111T0005_6356.h5


  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data 

Converting NLHRW_pvol_20230110T0010_6356.h5
Converting NLHRW_pvol_20230109T1140_6356.h5
Converting NLHRW_pvol_20230111T0010_6356.h5


  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data 

Converting NLHRW_pvol_20230110T0015_6356.h5
Converting NLHRW_pvol_20230109T1145_6356.h5
Converting NLHRW_pvol_20230111T0015_6356.h5


  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data = func(*input_data)
  result_data 

Converting NLHRW_pvol_20230110T0020_6356.h5
Converting NLHRW_pvol_20230109T1150_6356.h5
