In [1]:
from typing import Optional, Tuple, Union, Literal

import numpy as np
import pandas as pd
import plotly.graph_objects as go

from filters.filter_algorithms import AlferesAlgorithm
from filters.filters import AlferesFilter
from filters.kernels import EwmaKernel1, EwmaKernel3
from filters.models import EwmaUncertaintyModel, SignalModel
from filters.plots import UnivariatePlotter
from filters.config import Parameters
from filters.smoothers import new_kernel_smoother
from filters.utilities import combine_smooth_and_univariate, align_results_in_time, apply_observations_to_outliers

In [2]:
def get_kernel(order: Literal[1, 3], forgetting_factor: float = 0.25):
    if order == 1:
        return EwmaKernel1(forgetting_factor=forgetting_factor)
    elif order == 3:
        return EwmaKernel3(forgetting_factor=forgetting_factor)


def get_signal_model(order: Literal[1, 3], forgetting_factor: float = 0.25) -> SignalModel:
    return SignalModel(kernel=get_kernel(order, forgetting_factor))


def get_uncertainty_model(parameters: Parameters, order, forgetting_factor: float = 0.25) -> SignalModel:
    return EwmaUncertaintyModel(
        kernel=get_kernel(order, forgetting_factor),
        **parameters
    )



def mirror(df: Union[pd.DataFrame, pd.Series], first_date: Optional[str] = None, last_date: Optional[str] = None) -> pd.DataFrame:
    is_series = isinstance(df, pd.Series)
    series_name = ""
    if is_series:
        series_name = df.name
        df = pd.DataFrame(df)
    df = df.copy()

    if not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError("DataFrame should be index by time")
    if df.index.freqstr != "D":
        raise ValueError("Index frequency should be 'D'")

    if first_date:
        first_date = pd.to_datetime(first_date, format="%Y-%m-%d")
        df = df.loc[df.index > first_date]
    if last_date:
        last_date = pd.to_datetime(last_date, format="%Y-%m-%d")
        df = df.loc[df.index < last_date]

    index_name = df.index.name
    df = df.reset_index()
    reversed_df = df[::-1].copy()
    reversed_df[index_name] = reversed_df[index_name] - 2 * pd.to_timedelta(reversed_df.index, unit="D")
    reversed_df = reversed_df.iloc[:-1]
    result = pd.concat([reversed_df, df], axis=0)
    result.set_index(index_name, inplace=True)
    result = result.asfreq("D")
    return result[series_name] if is_series else result


def get_df_from_raw_data(df: pd.DataFrame, first_date=None, last_date=None) -> pd.DataFrame:
    df["Calculated_timestamp"] = pd.to_datetime(df["Calculated_timestamp"])
    data = df.set_index("Calculated_timestamp")
    if first_date:
        first_date = pd.to_datetime(first_date, format="%Y-%m-%d")
        data = data.loc[data.index > first_date]
    if last_date:
        last_date = pd.to_datetime(last_date, format="%Y-%m-%d")
        data = data.loc[data.index < last_date]
    value_columns = [col for col in data.columns if ("value" in col)]
    quality_columns = [col for col in data.columns if ("qualityFlag" in col)]
    data = data.dropna(subset=value_columns, how="all", axis=0)
    data[quality_columns] = data[quality_columns].astype(bool)
    return data.loc[:, value_columns + quality_columns]


def prepare_time_series(ts: pd.Series, data: pd.DataFrame, flow_name: str, use_log=False, use_flow=False, is_flag=False) -> pd.Series:
    series_name = "SRAS"
    if use_flow:
        ts = ts * data[flow_name] / 1e3  # Tgc/d
        series_name += " x Débit"
    ts = ts.dropna()

    if use_log:
        ts = np.log(ts + 0.001)
        series_name = f"log({series_name})"

    series_name = "Manually flagged" if is_flag else series_name
    ts.name = series_name
    return ts.asfreq("D")


def retrieve_flagged_data(data: pd.DataFrame, series_name: str, quality_name: str) -> pd.Series:
    ts = data[series_name]   # gc/ml
    flags = data[quality_name].fillna(False)
    flagged = ts.loc[flags]
    flagged.name = "Flagged"
    return flagged


def remove_flagged(time_series: pd.Series, flags: pd.Series) -> pd.Series:
    df = pd.concat([time_series, flags], axis=1)
    flag_name = flags.name
    df['keep'] = df.apply(lambda x: np.isnan(x[flag_name]), axis=1)
    return time_series.loc[df['keep']]


def write_units(use_log: bool, use_flow: bool) -> str:
    units = "1e9 cg/j" if use_flow else "cg/j"
    if use_log:
        units = f"log ({units})"
    return units


In [3]:
def main(data_path, year, calib_start, calib_end, params, start=None, end=None, use_mirror=False, use_log=False, use_flow=False, remove_flags=False):
    sars_name = 'WWMeasure_covn1_gcml_single-to-mean_value'
    sars_quality_name = "WWMeasure_covn1_gcml_single-to-mean_qualityFlag"
    flow_name = "SiteMeasure_wwflow_m3d_single-to-mean_value"

    if year == 2021:
        sars_name = sars_name.replace('n1', 'n2')
        sars_quality_name = sars_quality_name.replace('n1', 'n2')

    raw_data = pd.read_csv(data_path)

    data = get_df_from_raw_data(raw_data, start, end)

    flagged = retrieve_flagged_data(data, sars_name, sars_quality_name)
    original_time_series = data[sars_name]
    flagged = prepare_time_series(flagged, data, flow_name, use_log, use_flow, is_flag=True)
    original_time_series = prepare_time_series(original_time_series, data, flow_name, use_log, use_flow, is_flag=False)
    if remove_flags:
        time_series = remove_flagged(original_time_series, flagged)
    else:
        time_series = original_time_series.copy()
    time_series = time_series.asfreq("D").interpolate()
    units = write_units(use_log, use_flow)

    if use_mirror:
        # lengthen the series by mirroring
        mirrored_df = mirror(time_series, first_date=start, last_date=end)
        mirrored_df = mirror(mirrored_df)

        # fill gaps
        time_series = mirrored_df.interpolate()

    # seven-day rolling average
    seven_days = time_series.rolling(window=7, center=True).mean()

    # filter
    filter_obj = AlferesFilter(
        algorithm=AlferesAlgorithm(),
        signal_model=get_signal_model(order=3),
        uncertainty_model=get_uncertainty_model(
            params["uncertainty"],
            order=1
        ),
        control_parameters=params["control"],
    )

    calibration_data = time_series.loc[calib_start:calib_end]
    filter_obj.calibrate_models(calibration_data)
    filter_obj.add_dataframe(time_series)
    filter_obj.update_filter()
    filter_results = align_results_in_time(filter_obj.to_dataframe())
    filter_results = apply_observations_to_outliers(filter_results)
    # smoother
    smoother = new_kernel_smoother(**params["smoother"])
    to_smooth = filter_results[["date", "accepted_values"]].set_index("date")
    smoother.add_dataframe(to_smooth)
    smoother.update_filter()
    smoother_results = smoother.to_dataframe()
    results = combine_smooth_and_univariate(smoother_results, filter_results)
    
    
    # plotter
    plotter = UnivariatePlotter(
        signal_name="SARS",
        df=results,
        template="plotly_white",
        language="english"
    )
    fig = plotter.plot()



    raw_trace = go.Scatter(x=original_time_series.index, y=original_time_series, name="Signal brut", marker=dict(color="#bbbbbb"))
    fig.add_trace(raw_trace)

    flagged_trace = go.Scatter(x=flagged.index, y=flagged, name="Annoté manuellement", line=dict(color="#D62728"), mode='markers', marker=dict(symbol="star", size=15), showlegend=True)
    fig.add_trace(flagged_trace)
    
    seven_day_trace = go.Scatter(x=seven_days.index, y=seven_days, name="Liss. moy. 7j", marker=dict(color="#FF6F00"))
    fig.add_trace(seven_day_trace)

    fig.update_layout(dict(
        # template="presentation",
        title=f"Surveillance des eaux usées - Ville de Québec, Station Est - {time_series.name}"),
        yaxis=dict(title=f"Valeur ({units})"),
        xaxis=dict(title="Jour d'échantillonnage")),
    fig.update_layout(dict(hovermode='x unified', width=1000, height=800))
    fig.update_traces(hovertemplate='%{y:.2f}')
    return fig

parameters = {
    "control": {
        "n_outlier_threshold": 1,
        "n_steps_back": 2,
        "n_warmup_steps": 2,
    },
    "uncertainty": {
        "initial_uncertainty": 500,
        "minimum_uncertainty": 500,
        "uncertainty_gain": 0.75
    },
    "smoother": {
        "size": 2
    }
}
data_path = "/Users/jeandavidt/Library/CloudStorage/OneDrive-UniversitéLaval/Université/Doctorat/COVID/Latest Data/wide tables 2022/qc_01.csv"
year = 2022
calib_start, calib_end = ("22 March 2022", "27 March 2022")
params = parameters
start = None 
end = None
use_mirror = False
use_log = False
use_flow = True
remove_flags = False
fig = main(data_path, year, calib_start, calib_end, parameters, start=start, end=end, use_mirror=use_mirror, use_log=use_log, use_flow=use_flow, remove_flags=remove_flags)

KeyError: 'input_values'

In [None]:
parameters = {
    "control": {
        "n_outlier_threshold": 1,
        "n_steps_back": 2,
        "n_warmup_steps": 2,
    },
    "uncertainty": {
        "initial_uncertainty": 500,
        "minimum_uncertainty": 500,
        "uncertainty_gain": 0.75
    },
    "smoother": {
        "size": 2
    }
}
data_path = "/Users/jeandavidt/Library/CloudStorage/OneDrive-UniversitéLaval/Université/Doctorat/COVID/Latest Data/wide tables 2022/qc_01.csv"
year = 2022
calib_start, calib_end = ("22 March 2022", "27 March 2022")
params = parameters
start = None 
end = None
use_mirror = False
use_log = False
use_flow = True
remove_flags = False
fig = main(data_path, year, calib_start, calib_end, parameters, start=start, end=end, use_mirror=use_mirror, use_log=use_log, use_flow=use_flow, remove_flags=remove_flags)

In [None]:
fig.show()