# Introduction
This notebook shows the central part of the processing done for the study on German-Czech transboundary CML rainfall maps. The example data used here ('example_data.nc' is loaded below) comprises only 4 CMLs. The coordinates of these CMLs have been removed as this is sensitive information to the network provider. Nevertheless, the algorithms conducted for homogenization and rain rate retrieval can be followed this way.

# Import

In [11]:
import os
import sys
import numpy as np
import pandas as pd
import xarray as xr
import numba
from tqdm import tqdm

import pycomlink as pycml

import warnings
warnings.filterwarnings("ignore")

# Processing functions

In [12]:
# ====================================

def label_timeseries(cml):
     
    cml["isSane_plateaus_rsl"] = (
        ["time"],
        detect_plateaus(cml.rsl),
    )
    return cml


def detect_plateaus(series, time_span=3, max_thld=-85, std_thld=0.5, pad=5):
    
    low = series.rolling(time=time_span, center=True).max("time") < max_thld
    steady = series.rolling(time=time_span, center=True).std("time") < std_thld
    
    isSane = ~(low & steady)
    
    # padding
    isSane = ~(isSane.rolling(time=pad, center=True).min("time") == 0)
    
    return isSane

# ====================================

    
def get_blackout_start_and_end_v1(rsl, rsl_threshold):
    ''' Identify gaps due to heavy attenuation blackouts and return start and end
        
    Parameters
    ----------
    rsl: xr.DataArray
        Time series of received signal level
    rsl_threshold: numeric
        Value below which the start or end of a NaN gap have to be to count
        as blackout gaps
            
    Returns
    -------
    
    gap_start, gap_end
        
    '''
    
    # required to avoid RuntimeWarning when comapring to NaNs
    with np.errstate(invalid='ignore'):
        rsl_below_threshold = rsl < rsl_threshold
    rsl_nan = np.isnan(rsl)
    gap_start = np.roll(rsl_nan, -1) & rsl_below_threshold
    gap_end = np.roll(rsl_nan, 1) & rsl_below_threshold
    
    return gap_start, gap_end


@numba.jit(nopython=True)
def created_blackout_gap_mask_from_start_end_markers(
    rsl, gap_start, gap_end, max_gap_length=60
):
    mask = np.zeros(rsl.shape, dtype=numba.boolean)
    in_blackout_gap = False
    for i in range(len(rsl)):
        if gap_start[i] == True:
            in_blackout_gap = True
            cnt_gap_length = 0
        if (gap_start[i] == False) and ~np.isnan(rsl[i]):
            in_blackout_gap = False
        if gap_end[i] == True:
            in_blackout_gap = False
        if in_blackout_gap and np.isnan(rsl[i]) and (cnt_gap_length < max_gap_length):
            mask[i] = True
            cnt_gap_length += 1
            if cnt_gap_length > max_gap_length:
                mask[(i-max_gap_length):(i+1)] = False
                in_blackout_gap = False
    return mask


def blackout_filling(tsl_in, rsl_in, tsl_max, rsl_min):

    tsl_out = np.copy(tsl_in)
    rsl_out = np.copy(rsl_in)
    
    for i in tqdm(range(rsl_out.shape[0])):

        gap_start, gap_end = get_blackout_start_and_end_v1(
            rsl=rsl_out[i, :], rsl_threshold=-65
        )
        mask_forward = created_blackout_gap_mask_from_start_end_markers(
            rsl_out[i, :], gap_start, gap_end
        )
        mask_reverse = created_blackout_gap_mask_from_start_end_markers(
            rsl_out[i, :][::-1], gap_end[::-1], gap_start[::-1]
        )
        mask = mask_forward | mask_reverse[::-1]

        # fill tsl and rsl
        tsl_out[i, :][mask] = tsl_max[i]
        rsl_out[i, :][mask] = rsl_min[i]
            
    return tsl_out, rsl_out


# ====================================

def detection_limit_from_kR(L_km, freq_Hz, pol, A_quantization_dB=0.33):
    """
    k-R relation (specific attenuation): k = a * R**b
    integral attenuation: A = a * R**b * L
    sensitivity: A/R ~ a * L, assuming b=1
    """

    # ###
    # a, b = k_R_relation.a_b(
    #     f_GHz=freq_Hz / 1e9, pol=pol, approx_type="ITU_2005"
    # )
    a, b = pycml.processing.k_R_relation.a_b(
        f_GHz=freq_Hz / 1e9, pol=pol, approx_type="ITU"
    )

    sensitivity = a * L_km  # units of dB/mm/h
    detection_limit = A_quantization_dB / sensitivity

    return detection_limit


def add_detection_limit(cml_list):

    # add detection limit parameter to data set
    for cml in tqdm(cml_list):
        
        limit_sub = detection_limit_from_kR(
            L_km=cml.length.values,
            freq_Hz=cml.frequency.values,
            pol=cml.polarization.values,
        )

        # add to dataset
        cml["detection_limit"] = limit_sub

    return cml_list



# ====================================

def label_CMLs(cml):

    cml["isSane_detection_limit"] = cml.detection_limit < 2
    cml["isSane_std_long"] = temporal_sanity_check(cml.trsl, tspan=300, thld=2, perc=0.1)
    cml["isSane_std_short"] = temporal_sanity_check(cml.trsl, tspan=60, thld=0.8, perc=0.33)
    cml["isSane_available"] = availability_check(cml.trsl, perc_thld=0.5)

    return cml

def temporal_sanity_check(trsl, tspan=300, thld=2, perc=0.1):
    """
    sanity check according to Graf et al., 2020

    - tspan: time span in minutes
    - thld: threshold for critical standard deviation
    - perc: threshold for percentage of time in which thld is exceeded
    """

    # second sanity check
    roll_std = trsl.rolling(time=tspan, center=True).std()
    high_std = roll_std > thld
    low_std = roll_std <= thld
    isSane = (
        high_std.sum(dim="time") / (high_std.sum(dim="time") + low_std.sum(dim="time"))
        <= perc
    )

    return isSane

def availability_check(trsl, perc_thld=0.5):
    
    isSane = (trsl.isnull().sum("time") / len(trsl.time)) < perc_thld
    
    return isSane


# ====================================

def derive_rain(
    cml, 
    waa_model="Leijnse",
):
    
    # classification according to Max' paper
    wet_threshold = 1.12 * cml.trsl.rolling(
        time=60, center=True, min_periods=60
    ).std().quantile(0.8, dim="time")
   
    wet = cml.trsl_raw.rolling(
        time=60, center=True, min_periods=45
    ).std(dim="time") > wet_threshold    
    
    # set to wet where blackout filling was active
    if "flag_blackout_fill" in cml.data_vars or "flag_blackout_fill" in cml.coords:
        wet = wet.where(~cml.flag_blackout_fill, True)

    # baseline
    baseline = pycml.processing.baseline.baseline_constant(
        trsl=cml.trsl,
        wet=wet,
        n_average_last_dry=1,
    )

    # WAA
    if waa_model == "Pastorek":
        A_obs = cml.trsl - baseline
        A_obs = A_obs.where(~(A_obs<0), 0)
        waa = pycml.processing.wet_antenna.waa_pastorek_2021_from_A_obs(
            A_obs=A_obs,
            f_Hz=cml.frequency,
            L_km=cml.length,
            A_max=14,
            zeta=0.55,
            d=0.1,
        )

    elif waa_model == "Schleiss":
        waa = pycml.processing.wet_antenna.waa_schleiss_2013(
            rsl=cml.trsl,
            baseline=baseline,
            wet=wet,
            waa_max=2.5,
            delta_t=1,
            tau=15,
        )

    elif waa_model == "Leijnse":
        A_obs = cml.trsl - baseline
        A_obs = A_obs.where(~(A_obs<0), 0)

        waa = pycml.processing.wet_antenna.waa_leijnse_2008_from_A_obs(
            A_obs=A_obs,
            f_Hz=cml.frequency,
            L_km=cml.length,
            T_K=293.0,
            gamma=1.47e-5,
            delta=0.36,
            n_antenna=complex(1.73, 0.014),
            l_antenna=0.041,
        )
        
    A = cml.trsl - baseline - waa
    A = A.drop_vars("quantile")
    
    cml["R_cml"] = pycml.processing.k_R_relation.calc_R_from_A(
        A=A,
        L_km=float(cml.length),
        f_GHz=cml.frequency / 1e9,
        pol=cml.polarization,
    )

    return cml.load()


# Settings and Data
Define the processing lines and load the example data of four time series without the coordinates.

In [13]:
# settings
proc_lines = [
    "no_filter",
    "graf_2020",
    "full",
]
annex = "example"

# load example data
ds_comb = xr.open_dataset("example_data.nc").load()

# Data set before processing

In [14]:
ds_comb

# Processing
Apply the filter and time series interpolation steps, and derive rain rates.

In [17]:
# ===============================
# Add detection limit

# create list of single cmls
cml_list = [ds_comb.isel(cml_id=i) for i in range(len(ds_comb.cml_id))]
cml_list = add_detection_limit(cml_list)
ds_comb = xr.concat(cml_list, dim="cml_id")

# ===============================
# Loop over processing lines
ds_list = []
for proc_line in proc_lines:

    print()
    print("=====")
    print(proc_line)
    print()

    # ===============================
    # Copy combined data

    ds_cml = ds_comb.copy()

    # ===============================
    # Period Filter (plateau)

    if proc_line in ["full"]:

        print("Period filter ...")
        
        # create list of single cmls
        cml_list = [ds_cml.isel(cml_id=i) for i in range(len(ds_cml.cml_id))]

        # labelling
        cml_list_labelled = [label_timeseries(cml) for cml in tqdm(cml_list)]
        # for cml in cml_list: 
        #     cml_list_labelled.append(label_timeseries(cml))

        ds_cml = xr.concat(cml_list_labelled, dim="cml_id")

        # applying
        # set tsl and rsl to nan where timeseries was labelled
        ds_cml["tsl"] = ds_cml.tsl.where(
            ds_cml["isSane_plateaus_rsl"]
        )
        ds_cml["rsl"] = ds_cml.rsl.where(
            ds_cml["isSane_plateaus_rsl"]
        )

    else:

        # add an all-True flag if no filtering is active
        ds_cml["isSane_plateaus_rsl"] = (
            ("cml_id", "time"),
            np.ones((len(ds_cml.cml_id), len(ds_cml.time)), dtype=bool), 
        )

    # ===============================
    # Blackout Gap Filling

    if proc_line in ["full"]:

        print("Blackout filling ...")

        # find minima and maxima of rsl and tsl and save
        tsl_max = ds_cml.tsl.max("time")
        rsl_min = ds_cml.rsl.min("time")

        # blackout filling
        tsl_blackout, rsl_blackout = blackout_filling(
            ds_cml.tsl.values,
            ds_cml.rsl.values,
            tsl_max,
            rsl_min
        )
        
        # add a flag for data points that have been filled
        ds_cml["flag_blackout_fill"] = (
            ("cml_id", "time"),
            ds_cml.rsl.isnull() & ~np.isnan(rsl_blackout), 
        )

        # update dataset
        ds_cml["tsl"] = (("cml_id", "time"), tsl_blackout)
        ds_cml["rsl"] = (("cml_id", "time"), rsl_blackout)

    else: 

        # add an all-False flag if blackout filling is not active
        ds_cml["flag_blackout_fill"] = (
            ("cml_id", "time"),
            np.zeros((len(ds_cml.cml_id), len(ds_cml.time)), dtype=bool), 
        )
            
    # ===============================
    # calculate trsl
    ds_cml["trsl"] = ds_cml["tsl"] - ds_cml["rsl"]

    # trsl raw: added to the dataset to be used in 
    # derivation of rain. afterwards deleted.
    ds_cml["trsl_raw"] = ds_cml.trsl.copy()

    # ===============================
    # interpolate trsl
    if proc_line in ["graf_2020", "full"]:

        print("Interpolation of TRSL ...")

        # interploate short gaps in trsl time series
        ds_cml["trsl"] = ds_cml.trsl.interpolate_na(
            dim="time", method="linear", max_gap="5min"
        )

    # ===============================
    # General CML Labelling

    print("Label CMLs ...")

    # create list of single cmls
    cml_list = [ds_cml.isel(cml_id=i) for i in range(len(ds_cml.cml_id))]

    cml_list_labelled = []
    for cml in cml_list:
        
        cml["isSane_detection_limit"] = cml.detection_limit < 2
        cml["isSane_std_long"] = temporal_sanity_check(cml.trsl, tspan=300, thld=2, perc=0.1)
        cml["isSane_std_short"] = temporal_sanity_check(cml.trsl, tspan=60, thld=0.8, perc=0.33)
        cml["isSane_available"] = availability_check(cml.trsl, perc_thld=0.5)
        
        cml_list_labelled.append(cml)

    ds_cml = xr.concat(cml_list_labelled, dim="cml_id")
    
    del cml_list
    del cml_list_labelled

    # ===============================
    # CML flags dependent on proc_line

    if proc_line in ["graf_2020"]:

        ds_cml["isSane"] = (
            "cml_id",
            ds_cml.isSane_std_long \
            & ds_cml.isSane_std_short,
        )

    elif proc_line in ["full"]:

        ds_cml["isSane"] = (
            "cml_id",
            ds_cml.isSane_std_long \
            & ds_cml.isSane_std_short \
            & ds_cml.isSane_detection_limit \
            & ds_cml.isSane_available,
        )

    else:

        ds_cml["isSane"] = (
            "cml_id",
            np.ones(len(ds_cml.cml_id), dtype=bool),
        )

    # ===============================
    # DERIVE RAIN

    print("Deriving rain ...")

    # create list of single cmls
    cml_list = [ds_cml.isel(cml_id=i) for i in range(len(ds_cml.cml_id))]

    cml_list_rain = [derive_rain(cml) for cml in cml_list]
    # for cml in cml_list:
    #     cml = derive_rain(cml)
    #     cml_list_rain.append(cml)

    # single dataset
    ds_cml = xr.concat(cml_list_rain, dim="cml_id")

    # delete temporary lists
    del cml_list
    del cml_list_rain

    # ===============================
    # Final Adjustments to individual datasets

    # delete superfluous varaibles from ds
    ds_cml = ds_cml.drop_vars(["trsl_raw", "trsl"])

    # set rain to nan for CMLs that are not sane
    ds_cml["R_cml"] = ds_cml.R_cml.where(ds_cml.isSane)

    # set some of the variables to coords
    for var in ds_cml.data_vars:
        if not ("tsl" in var or "rsl" in var or "R" in var or "trsl" in var) or "isSane" in var:
            ds_cml = ds_cml.set_coords(var)

    # set all data variables to float32 for memory reduction
    for var in ds_cml.data_vars:
        ds_cml[var] = ds_cml[var].astype("float32")

    # add info on processing line
    ds_cml = ds_cml.assign_coords({"proc_line": proc_line})

    # append to list of datasets for different proc_lines
    ds_list.append(ds_cml)

# combine datasets
ds = xr.concat(ds_list, dim="proc_line", data_vars="different")

# ===================================
# save

print("Save ...")

# save minutely data
encoding = {}
for name in list(ds.data_vars) + list(ds.coords):
    encoding[name] = {"zlib": True}   
ds.to_netcdf(
    "../../data/path_rain_minutely_%s.nc"%annex,
    encoding=encoding
)

print("Done.")

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 345.82it/s]



=====
no_filter

Label CMLs ...
Deriving rain ...

=====
graf_2020

Interpolation of TRSL ...
Label CMLs ...
Deriving rain ...

=====
full

Period filter ...


100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:05<00:00,  1.34s/it]


Blackout filling ...


100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 734.62it/s]


Interpolation of TRSL ...
Label CMLs ...
Deriving rain ...
Save ...
Done.


# Processed data set

In [20]:
ds