# Attenuated Signal Test Example

This notebook demonstrates an example of how an attenuation test can be used to detect biofouling on an instrument.

The source data is derived from a historical USGS CTD station located at Lower Sand Island, OR in the Columbia River estuary.
The selected time period shows the tidal influence on salinity over a spring-neap time period.
Near the end of the selected period, there is a decrease in the range of salinity corresponding with biofouling.

The data was downloaded from the [Center for Coastal Margin and Prediction (CMOP) Data Explorer](https://amb6400b.stccmop.org/ws/product/offeringplot_ctime.py?handlegaps=true&series=time,sandi.790.A.CTD.salt.PD0&width=8.54&height=2.92&starttime=2001-7-1%200:00&endtime=2001-09-5%2023:59).

### Setup

In [None]:
from __future__ import annotations

import numpy as np
import pandas as pd

from bokeh.layouts import gridplot
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure, output_notebook, show

from ioos_qc.config import Config
from ioos_qc.stores import PandasStore
from ioos_qc.streams import PandasStream

output_notebook()

In [None]:
def plot_results(data, var_name, test_name, title):
    """Plot timeseries of original data colored by quality flag

    Args:
    ----
        data: pd.DataFrame of original data and results including a time variable
        var_name: string name of the variable to plot
        test_name: name of the test to determine which flags to use
        title: string to add to plot title
    """

    # create a ColumnDataSource by passing the dataframe of original data
    source = ColumnDataSource(data=data)
    qc_test = data[var_name + '_qartod_' + test_name]

    # Create a separate timeseries of each flag value
    source.data["qc_pass"] = np.ma.masked_where(qc_test != 1, data[var_name])
    source.data["qc_suspect"] = np.ma.masked_where(qc_test != 3, data[var_name])
    source.data["qc_fail"] = np.ma.masked_where(qc_test != 4, data[var_name])
    source.data["qc_notrun"] = np.ma.masked_where(qc_test != 2, data[var_name])  

    # start the figure
    p1 = figure(x_axis_type="datetime", title=test_name + " : " + title)
    p1.grid.grid_line_alpha = 0.3
    p1.xaxis.axis_label = "Time"
    p1.yaxis.axis_label = "Observation Value"

    # plot the data, and the data colored by flag
    p1.line(x='time', y=var_name, source=source, legend_label="obs", color="#A6CEE3")
    p1.scatter(
        x='time', y='qc_pass', source=source, 
        size=4, color="green", alpha=0.5,
        legend_label="qc pass"
    )
    p1.scatter(
        x='time', y='qc_suspect', source=source, 
        size=4, color="orange", alpha=0.7,
        legend_label="qc suspect",
    )
    p1.scatter(
        x='time', y='qc_fail', source=source,
        size=6, color="red", alpha=1.0,
        legend_label="qc fail"
    )
    p1.scatter(
        x='time', y='qc_notrun', source=source,
        size=6, color="gray", alpha=1.0,
        legend_label="qc not run",
    )


    # show the plot
    show(gridplot([[p1]], width=800, height=400))

## Run example

#### 1. Load data and perform exploratory analysis

Historical salinity [data from Lower Sand Island, Columbia River Estuary](http://amb6400b.stccmop.org/ws/product/offeringplot_ctime.py?handlegaps=true&series=time,sandi.790.A.CTD.salt.PD0&width=8.54&height=2.92&starttime=2001-7-1%200:00&endtime=2001-09-5%2023:59). The location is the same as [sandi for CREOFS](https://tidesandcurrents.noaa.gov/ofs/ofs_station.html?stname=Lower%20Sand%20Island%20Light&ofs=cre&stnid=sandi&subdomain=ba).

In [None]:
import pandas as pd

url = "https://github.com/ioos/ioos_qc/raw/master/docs/source/examples"
fname = f"{url}/attenuated_salinity_example.csv"

data = pd.read_csv(fname, header=1, parse_dates=True)
# data need some prep to be compatible with IOOS-QC
data['time'] = pd.to_datetime(data["'time PST'"])
data = data.drop(columns=["'time PST'"])
var_name = 'sandi_salinity'
data = data.rename(columns={data.columns[0]: var_name})
print(data.head())
data.plot(x='time', y=var_name)

#### 2. Plot range and standard deviation of salinity over M2 moving window

In [None]:
# lunar day (M2)
time_delta = 3600 * 24.8
print(f"window: {time_delta}")

# start QC after half a tidal day
min_period_secs = time_delta / 2

# one obs per minute
min_period_obs = time_delta / 2 / 60

# pandas uses phrase "min_periods" to indicate minimum number of observations in the window
# - ioos_qc uses the phrase "min_period" to indicate minimum number of seconds in the window
# Depending on your use case, you can define the period in number of observations or number of seconds
print(f"min_period (secs): {min_period_secs}")
print(f"min_period (num obs): {min_period_obs}")

Range check.

In [None]:
range_data = data.set_index('time').rolling(f"{time_delta}s", min_periods=int(min_period_obs)).apply(
    np.ptp,
    raw=True,
)

range_data.plot();

Check that min_{periods, obs} are NaN

- note that N-1 are NaNs

In [None]:
range_data.isna().sum()

Standard deviation check.

In [None]:
stdev_data = data.set_index('time').rolling(f"{time_delta}s", min_periods=int(min_period_obs)).apply(
    np.std,
    raw=True,
)

stdev_data.plot();

#### 3. Run QC and plot results

##### Test range

The beginning 743 points (`min_period`) are marked as "NOT EVALUATED" because there is not enough data yet to evaluate whether they are pass or fail.

The range of the signal falls so quickly that no points are marked as "SUSPECT", but immediately change from "PASS" to "FAIL".

In [None]:
title = "Salinity [psu] : Lower Sand Island, OR"

# Convert the data to a Stream (Pandas dataframe to a PandasStream)
pandas_stream = PandasStream(data)

qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 17.25,
                    "fail_threshold": 15,
                    "test_period": int(time_delta),
                    "min_period": min_period_secs,
                    "check_type": "range",
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

p1 = plot_results(qc_results, var_name, "attenuated_signal_test", title)

##### Test standard deviation

The "standard deviation" test picks up likely suspect data whereas the "range" test did not.
The exemplifies the utility of using both tests in tandem.

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 5,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": min_period_secs,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

## Sensitivity Tests


### 1. Test results sensitivity to `min_period`

The following plots demonstrate the sensitivity of the test results in the beginning of a time series to the selection of `min_period`. 


### `min_period`: 0

No observations are marked suspect at the beginning of the time series, but the first 744 observations (the size of the rolling window) are labeled "FAILING".

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 5,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": 0,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

### `min_period`: 10min

Only the first 10s observations are marked as NOT EVALUATED, but the remainder of the first 744 samples are labeled as "FAILING".

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 5,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": 10 * 60,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

### `min_period`: 1h

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 5,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": 60 * 60,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

### `min_period`: test_period/2

The first 744 samples are marked "NOT EVALUATED", but none are marked as "FAILING".

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 5,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": int(time_delta / 2),
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

### 2. Test results sensitivity to `suspect_threshold`.


### `suspect_threshold`: 7

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 7,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": min_period_secs,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

### `suspect_threshold`: 6

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 6,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": min_period_secs,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)

### `suspect_threshold`: 5

In [None]:
qc_config = {
    "streams": {
        var_name: {
            "qartod": {
                "attenuated_signal_test": {
                    "suspect_threshold": 5,
                    "fail_threshold": 4.5,
                    "check_type": "std",
                    "test_period": int(time_delta),
                    "min_period": min_period_secs,
                },
            },
        },
    },
}
qc = Config(qc_config)

# Pass the run method the config to use
result_generator = pandas_stream.run(qc)

# Put the results in a new Pandas dataframe and combine with the original data
store = PandasStore(result_generator)
results_store = store.save(write_data=False, write_axes=False)
qc_results = pd.concat([data, results_store], axis=1)

plot_results(qc_results, var_name, "attenuated_signal_test", title)