# Demo: Instrument-level processing incl. mooring-level QC

This notebook walks through Stage 3, calibration corrections.  For moored instruments on a tall mooring, there may be useful information in neighboring instruments to improve detection of problems.

In [None]:
from pathlib import Path
import numpy as np
import xarray as xr
import numpy as np
from oceanarray import readers, mooring_rodb, plotters, tools, process_rodb
import pandas as pd

## Batch load the *.use files (after Stage 2)

In [None]:
# Batch convert files to OS
data_dir = Path("..", "data")
files = list(data_dir.glob("wb2_9_201114*use"))

ds_list = readers.load_dataset(files)

config_dir = Path("..") / "oceanarray" / "config"
var_map_yaml = config_dir / "OS1_var_names.yaml"
vocab_yaml = config_dir / "OS1_vocab_attrs.yaml"
sensor_yaml = config_dir / "OS1_sensor_attrs.yaml"
project_yaml = config_dir / "project_RAPID.yaml"

ds_list_OS = []
for i, file in enumerate(files):
    metadata_txt = data_dir / file.name
    ds_OS = convertOS.convert_rodb_to_oceansites(ds_list[i], metadata_txt, var_map_yaml, vocab_yaml, sensor_yaml=sensor_yaml, project_yaml=project_yaml)
    filepath = writers.save_OS_instrument(ds_OS, data_dir)
    print(f"Converted {file.name} to OceanSITES format and saved to {filepath}")
    ds_list_OS.append(ds_OS)

print(f"Number of converted datasets: {len(ds_list_OS)}")
assert len(ds_list_OS) > 0, "No datasets were converted successfully!"

## Step 3: Calibrations & QC

This time, we are loading the *.use.  We don't have the calibration information, so we are doing some more manual choices on instrument quality.


In [None]:
import importlib
importlib.reload(mooring_rodb)
# Flag bad data to convert from P to D
data_dir = Path("..", "data")
files = list(data_dir.glob("OS_wb2_9_201114_*_P.nc"))

ds_list_OS1 = readers.load_dataset(files)

ds_stack = mooring_rodb.combine_mooring_OS(ds_list_OS)
ds_stack = tools.calc_psal(ds_stack)

ds_stack

In [None]:
plotters.plot_timeseries_by_depth(ds_stack, var="PSAL")

In [None]:
ds_qc = tools.run_qc(ds_stack)
plotters.plot_timeseries_by_depth(ds_stack, var="CNDC_QC")



In [None]:
### Try some automatic QC using ioos_qc

In [None]:

config = {
    "TEMP": {
        "qartod": {
            "gross_range_test": {"suspect_span": [0, 30], "fail_span": [-2.5, 40]},
            "spike_test": {"suspect_threshold": 2.0, "fail_threshold": 6.0},
        },
    },
    "CNDC": {
        "qartod": {
            "gross_range_test": {"suspect_span": [35, 65], "fail_span": [30, 70]},
        },
    },
    "PSAL": {
        "qartod": {
            "gross_range_test": {"suspect_span": [5, 38], "fail_span": [2, 41]},
            "spike_test": {"suspect_threshold": 0.3, "fail_threshold": 0.9},
            "location_test": {"bbox": [-80, -70, 20, 30]},# [lon_min, lon_max, lat_min, lat_max]
        },
    },
}


In [None]:
def build_climatology_config(ds, var="TEMP", tspan=("2000-01-01", "2100-01-01"), std_multiplier=2):
    """
    Build QARTOD climatology_test config from dataset stats per depth.

    Parameters
    ----------
    ds : xarray.Dataset
        Dataset with dimensions (TIME, DEPTH).
    var : str
        Variable name to construct climatology test for.
    tspan : tuple of str
        Time range for validity of climatology (start, end).
    std_multiplier : float
        Number of standard deviations to use for the vspan.

    Returns
    -------
    dict
        Dictionary for use in QARTOD configuration.
    """
    config_ranges = []

    for depth in ds["DEPTH"].values:
        da = ds[var].sel(DEPTH=depth)
        mean = da.mean(skipna=True).item()
        std = da.std(skipna=True).item()

        vmin = mean - std_multiplier * std
        vmax = mean + std_multiplier * std

        config_ranges.append({
            "tspan": list(tspan),
            "zspan": [float(depth), float(depth)],
            "vspan": [round(vmin, 2), round(vmax, 2)],
        })

    return {var: {"qartod": {"climatology_test": {"ranges": config_ranges}}}}

climatology_cfg = build_climatology_config(ds_stack, var="TEMP")

climatology_cfg = build_climatology_config(ds_stack, var="PSAL")

climatology_cfg = build_climatology_config(ds_stack, var="CNDC")



In [None]:
import numpy as np
import xarray as xr
from ioos_qc.qartod import climatology_test
from ioos_qc.results import CollectedResult

def run_climatology_qc_for_depth(ds1d, var="TEMP", test_config=None):
    """
    Run the QARTOD climatology_test manually on a single-depth xarray Dataset.

    Parameters
    ----------
    ds1d : xarray.Dataset
        Dataset with one depth level (squeezed to dims: TIME only).
    var : str
        Variable to run QC on (e.g., "TEMP", "PSAL").
    test_config : dict
        Climatology test config in the form:
        {
            "threshold_depth": [<depth>],
            "climatology_thresholds": [{"span": [min_val, max_val]}]
        }

    Returns
    -------
    CollectedResult
        The QARTOD climatology_test result for the variable at this depth.
    """
    if test_config is None:
        raise ValueError("test_config must be provided explicitly")

    depth = float(ds1d["DEPTH"])
    time = ds1d["TIME"].values
    data = ds1d[var].values
    zinp = np.full_like(time, depth, dtype=float)

    flags = climatology_test(
        tinp=time,
        zinp=zinp,
        inp=data,
        config=test_config,
    )

    return CollectedResult(
        stream_id=var,
        package="qartod",
        test="climatology_test",
        function=climatology_test,
        results=flags,
        tinp=time,
        data=data,
    )


In [None]:

from ioos_qc.qartod import aggregate
import copy
from ioos_qc.results import CollectedResult, collect_results
from ioos_qc.streams import XarrayStream
from ioos_qc.qartod import climatology_test, ClimatologyConfig

config_list = [
    {"tspan": ["2000-01-01", "2100-01-01"], "zspan": [50.0, 50.0], "vspan": [20.0, 25.0]},
    {"tspan": ["2000-01-01", "2100-01-01"], "zspan": [100.0, 100.0], "vspan": [15.0, 20.0]},
]

clim_config = ClimatologyConfig()
for entry in config_list:
    clim_config.add(**entry)

from ioos_qc.qartod import aggregate, climatology_test, ClimatologyConfig
from ioos_qc.results import CollectedResult, collect_results
from ioos_qc.streams import XarrayStream
from ioos_qc.config import Config
import numpy as np
import xarray as xr

def run_qartod_all_tests(ds1d, config, var="TEMP"):
    """
    Run all QARTOD tests (including manual climatology) and append QC_ROLLUP.

    Parameters
    ----------
    ds1d : xarray.Dataset
        1D dataset for a single depth (TIME dimension only).
    config : dict
        QARTOD configuration dictionary.
    var : str
        Variable to run QC on (e.g., "TEMP", "PSAL", "CNDC").

    Returns
    -------
    ds1d : xarray.Dataset
        Dataset with QC_ROLLUP flag added.
    results : list
        List of CollectedResult objects.
    """
    config_clean = copy.deepcopy(config)
    # Remove climatology_test for XarrayStream runner
    if "climatology_test" in config_clean.get(var, {}).get("qartod", {}):
        print(f"Removing climatology_test from config for variable {var} to run XarrayStream")
        del config_clean[var]["qartod"]["climatology_test"]
    c = Config(config_clean)
    qc = XarrayStream(ds1d, lon="LONGITUDE", lat="LATITUDE", time="TIME")
    runner = list(qc.run(c))
    results = collect_results(runner, how="list")

    # Manually run climatology_test if configured
    clim_cfg = config_clean.get(var, {}).get("qartod", {}).get("climatology_test", None)
    if clim_cfg:
        clim_config = ClimatologyConfig()
        for entry in clim_cfg["ranges"]:
            clim_config.add(**entry)

        zinp = np.full_like(ds1d[var].values, float(ds1d["DEPTH"]), dtype=float)
        flags = climatology_test(
            config=clim_config,
            inp=ds1d[var].values,
            tinp=ds1d["TIME"].values,
            zinp=zinp,
        )
        results.append(
            CollectedResult(
                stream_id=var,
                package="qartod",
                test="climatology_test",
                function=climatology_test,
                results=flags,
                tinp=ds1d["TIME"].values,
                data=ds1d[var].values,
            )
        )

    # Aggregate QC
    rollup = CollectedResult(
        stream_id="",
        package="qartod",
        test="qc_rollup",
        function=aggregate,
        results=aggregate(results),
        tinp=ds1d["TIME"].values,
        data=ds1d[var].values,
    )
    results.append(rollup)

    # Attach to dataset
    ds1d["QC_ROLLUP"] = xr.DataArray(
        data=rollup.results,
        coords={"TIME": ds1d["TIME"]},
        dims=["TIME"],
        attrs={
            "long_name": "Aggregate QARTOD flag",
            "flag_meanings": "good_data not_evaluated suspect_data bad_data",
            "flag_values": "1 2 3 4",
            "standard_name": "aggregate_quality_flag",
            "comment": f"QARTOD QC rollup for variable {var}, including climatology",
        },
    )

    return ds1d, results, config_clean


config

In [None]:
depth_index = 2
ds1d = ds_stack.isel(DEPTH=depth_index).squeeze()
ds1d, results, config_clean = run_qartod_all_tests(ds1d, config=config, var="TEMP")
rollup = next(r for r in results if r.test == "qc_rollup")


In [None]:
import matplotlib.pyplot as plt
import importlib
importlib.reload(plotters)

plotters.plot_qartod_summary(ds1d, var="PSAL")   # use for salinity



In [None]:
import pandas as pd
import numpy as np

def qc_failure_diagnosis(results, rollup):
    """
    Diagnose which QARTOD tests caused rollup failures.

    Parameters
    ----------
    results : list of CollectedResult
        Individual QARTOD test results (excluding or including the rollup).
    rollup : CollectedResult
        The aggregate rollup result (QC_ROLLUP) from QARTOD.

    Returns
    -------
    pd.DataFrame
        Table of time, rollup flag, and names of tests that failed at each time step.
    """
    time = pd.to_datetime(rollup.tinp)
    rollup_flags = np.asarray(rollup.results)

    # Filter to individual tests only (exclude the rollup)
    individual_results = [r for r in results if r.test != "qc_rollup"]
    test_names = [f"{r.stream_id}:{r.test}" if r.stream_id else r.test for r in individual_results]

    test_matrix = np.vstack([np.asarray(r.results) for r in individual_results])

    fail_reasons = []
    for i in range(test_matrix.shape[1]):
        if rollup_flags[i] != 4:
            fail_reasons.append("")
        else:
            failing_tests = [
                test_names[j] for j in range(test_matrix.shape[0])
                if test_matrix[j, i] == 4
            ]
            fail_reasons.append(", ".join(failing_tests))

    df = pd.DataFrame({
        "TIME": time,
        "QC_ROLLUP": rollup_flags,
        "Failed_Tests": fail_reasons
    })

    return df[df["QC_ROLLUP"] == 4]

df_fail = qc_failure_diagnosis(results, rollup)
print(df_fail.head(10))


## More QC tests

In [None]:
depths = ds_stack.DEPTH.values
depth_indices = []

for i, d in enumerate(depths):
    # Indices where depth is less than or equal to d, but not the same index
    leq_indices = np.where((depths <= d) & (np.arange(len(depths)) != i))[0]
    # Indices where depth is greater than d
    gt_indices = np.where(depths > d)[0]

    # Find the max index for less than or equal, and min index for greater than
    idx_leq = leq_indices.max() if leq_indices.size > 0 else None
    idx_gt = gt_indices.min() if gt_indices.size > 0 else None

    # Only return one index if the other does not exist
    if idx_gt is not None:
        depth_indices.append((idx_leq, idx_gt))
    else:
        depth_indices.append((idx_leq,))

print(depth_indices)

In [None]:
import matplotlib.pyplot as plt

for i, (idx_leq, *rest) in enumerate(depth_indices):
    plt.figure(figsize=(10, 4))
    # Main index (black)
    main_data = ds_stack.CNDC[:, i].values
    plt.plot(ds_stack.TIME, process_rodb.normalize_by_middle_percent(main_data, percent=95), color='k', label=f'DEPTH={depths[i]}m (main)')

    # Next shallower (red)
    if idx_leq is not None:
        shallower_data = ds_stack.CNDC[:, idx_leq].values
        plt.plot(ds_stack.TIME, process_rodb.normalize_by_middle_percent(shallower_data, percent=95), color='r', label=f'DEPTH={depths[idx_leq]}m (shallower)')

    # Next deeper (blue)
    if rest and rest[0] is not None:
        idx_gt = rest[0]
        deeper_data = ds_stack.CNDC[:, idx_gt].values
        plt.plot(ds_stack.TIME, process_rodb.normalize_by_middle_percent(deeper_data, percent=95), color='b', label=f'DEPTH={depths[idx_gt]}m (deeper)')

    plt.title(f'CNDC at DEPTH={depths[i]}m and neighbors (normalized)')
    plt.xlabel('Time')
    plt.ylabel('Normalized CNDC')
    plt.legend()
    plt.tight_layout()
    plt.show()