In [None]:
#|hide
#|default_exp qcrad

# PyrNet automatic quality checks
In the following, functions for automatic quality screening are developed. Including BSRN recommended physical and rare limits, as well as network sanity checks.

In [7]:
#|export
import xarray as xr
import numpy as np
import trosat.sunpos as sp
import logging

import pyrnet.data


# logging setup
logging.basicConfig(
    filename='pyrnet.log',
    encoding='utf-8',
    level=logging.DEBUG,
    format='%(asctime)s %(name)s %(levelname)s:%(message)s'
)
logger = logging.getLogger(__name__)

In [8]:
#|export
class CONSTANTS:
    S0 = 1367  # W m-2
    k = 5.67*1e-8

## BSRN recommended checks
BSRN recommends thresholds for physical and rare limits of GHI data

In [9]:
#|export
class QCCode:
    """ BSRN quality codes
    https://wiki.pangaea.de/wiki/BSRN_Toolbox#Quality_Check
    """
    below_physical = 2**0
    above_phyiscal = 2**1
    below_rare = 2**2
    above_rare = 2**3
    compare_to_low = 2**4
    compare_to_high = 2**5

Load test dataset:

In [10]:
#|dropcode
#|dropout
fname = "../../example_data/to_l1b_output.nc"
ds_l1b = xr.load_dataset(fname)
ds_l1b 

Function to initialize qc-flag variables:

In [None]:
#|export
#|dropcode
def init_qc_flag(ds, var):
    qc_bits = [2**i for i in range(6)]
    ds[f"qc_flag_{var}"] = ds[var].copy()
    ds[f"qc_flag_{var}"].values *= 0
    ds[f"qc_flag_{var}"].attrs.update({
        "standard_name": "quality_flag",
        "units": "-",
        "ancillary_variables": var,
        "valid_range": [0, np.sum(qc_bits)],
        "flag_masks": qc_bits,
        "flag_values": qc_bits,
        "flag_meanings": str(
            "below_physical_limit" + " " +
            "above_physical_limit" + " " +
            "below_rare_limit" + " " +
            "above_rare_limit" + " " +
            "comparison_to_low" + " " +
            "comparison_to_high"
        )
    })
    ds[f"qc_flag_{var}"].encoding.update({
        "dtype": "u1",
        "_FillValue": 0
    })
    return ds

Dataset with initialized qc-flags

In [11]:
config = pyrnet.data.get_config()

for var in config["radflux_varname"]:
    ds_l1b = init_qc_flag(ds_l1b, var) 
    
ds_l1b

NameError: name 'pyrnet' is not defined

In [None]:
def quality_control(ds, lat=None, lon=None):
    def _init_qc(ds, var):
        qc_bits = [2**i for i in range(6)]
        ds[f"qc_flag_{var}"] = ds[var].copy()
        ds[f"qc_flag_{var}"].values *= 0
        ds[f"qc_flag_{var}"].attrs.update({
            "standard_name": "quality_flag",
            "units": "-",
            "ancillary_variables": var,
            "valid_range": [0, np.sum(qc_bits)],
            "flag_masks": qc_bits,
            "flag_values": qc_bits,
            "flag_meanings": str(
                "below_physical_limit" + " " +
                "above_physical_limit" + " " +
                "below_rare_limit" + " " +
                "above_rare_limit" + " " +
                "comparison_to_low" + " " +
                "comparison_to_high"
            )
        })
        ds[f"qc_flag_{var}"].encoding.update({
            "dtype": "u1",
            "_FillValue": 0
        })
        return ds

    # retrieve solar zenith angle from data
    szen = None
    for var in ds.filter_by_attrs(standard_name="solar_zenith_angle"):
        szen = ds[var].values*Unit(ds[var].attrs["units"])
        break
    # calculate solar zenith angle if not in data
    if szen is None:
        for var in ds.filter_by_attrs(standard_name="latitude"):
            lat = ds[var].values
            break
        for var in ds.filter_by_attrs(standard_name="longitude"):
            lon = ds[var].values
            break
        assert lat is not None
        assert lon is not None
        szen,_ = sp.sun_angles(ds.time.values, lat=lat, lon=lon)*Unit("degrees")

    mu0 = np.cos(szen.to("radian").value)
    mu0[mu0 < 0] = 0 #  exclude night
    szen = szen.value
    esd = sp.earth_sun_distance(ds.time.values)
    Sa = CONSTANTS.S0 / esd**2

    # GHI
    for var in ds.filter_by_attrs(standard_name=SNAMES.ghi):
        # init quality control variable
        ds = _init_qc(ds, var)
        # physical minimum
        mask = ds[var].values < -4
        ds[var+"_qc"].values[mask] += QCCode.below_physical
        # physical maximum
        mask = ds[var].values > ((Sa * 1.5 * mu0 ** 1.2) + 100)
        ds[var+"_qc"].values[mask] += QCCode.above_phyiscal
        # rare limit minimum
        mask = ds[var].values < -2
        ds[f"qc_flag_{var}"].values[mask] += QCCode.below_rare
        # rare limit maximum
        mask = ds[var].values > ((Sa * 1.2 * mu0 ** 1.2) + 50)
        ds[f"qc_flag_{var}"].values[mask] += QCCode.above_rare

In [None]:
#|hide
# Export module
# Requires *nbdev* to export and update the *../lib/logger.py* module
import nbdev.export
import nbformat as nbf
name = "qcrad"

# Export python module
nbdev.export.nb_export( f"{name}.ipynb" ,f"../../src/pyrnet")

# Export to docs
ntbk = nbf.read(f"{name}.ipynb", nbf.NO_CONVERT)

text_search_dict = {
    "#|hide": "remove-cell",  # Remove the whole cell
    "#|dropcode": "hide-input",  # Hide the input w/ a button to show
    "#|dropout": "hide-output"  # Hide the output w/ a button to show
}
for cell in ntbk.cells:
    cell_tags = cell.get('metadata', {}).get('tags', [])
    for key, val in text_search_dict.items():
            if key in cell['source']:
                if val not in cell_tags:
                    cell_tags.append(val)
    if len(cell_tags) > 0:
        cell['metadata']['tags'] = cell_tags
    nbf.write(ntbk, f"../../docs/source/nbs/{name}.ipynb")