In [6]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

dir= "./data/processed/"

og= pd.read_csv(dir+"combined_flow_data.csv")



In [7]:
og.head()

Unnamed: 0,DateTime,Rain_in,Flow_MGD,Flow_MGD_1,Depth_in,Velocity_ft_s,Meter,Source_File
0,2023-01-01 12:00:00,0.0,1.496,1.417221,8.71,2.26,CBO,DURHAM_CBO_20230101-20260101.csv
1,2023-01-01 12:05:00,0.0,,,,,CBO,DURHAM_CBO_20230101-20260101.csv
2,2023-01-01 12:10:00,0.0,,,,,CBO,DURHAM_CBO_20230101-20260101.csv
3,2023-01-01 12:15:00,0.0,1.505,1.426247,8.75,2.25,CBO,DURHAM_CBO_20230101-20260101.csv
4,2023-01-01 12:20:00,0.0,,,,,CBO,DURHAM_CBO_20230101-20260101.csv


In [None]:
# src/rdii/data_cleaner.py
"""Module for cleaning sewer flow timeseries data."""

import sys
import pandas as pd
import numpy as np
import os
from pathlib import Path


def clean_sewer_timeseries(
    df,
    flow_col='Flow_MGD',
    freq='15min',
    interp_limit=4):
    """
    Clean sewer flow timeseries meter-by-meter.
    """

    # Handle empty dataframe
    if len(df) == 0:
        # Return empty df with QC_flag column
        result = df.copy()
        result['QC_flag'] = pd.Series(dtype='object')
        return result

    cleaned_all = []

    for meter, group in df.groupby('Meter'):
        print(f"Cleaning meter: {meter} with {len(group)} records")
        cleaned = _clean_single_meter(group, flow_col, freq, interp_limit)
        cleaned_all.append(cleaned)

    return pd.concat(cleaned_all, ignore_index=True)


def _clean_single_meter(df, flow_col, freq, interp_limit):

    """
    Clean data for a single meter.    
    """

    # Sort by time
    df = df.sort_values('DateTime').copy()
    df=df[['DateTime', flow_col,'Meter','Source_File']]
    
    # Enforce regular timestep
    df = enforce_regular_timestep(df, freq)
    
    
    # Apply cleaning steps and track what happened
    df, negative_mask = remove_negative_flows(df, flow_col, threshold=0.0)
    df, flatline_mask = remove_flatlines(df, flow_col,)
    df, outlier_mask = remove_low_outliers(df, flow_col, window=30, threshold_multiplier=3)
    df, interpolated_mask = interpolate_gaps(df, flow_col, interp_limit)
    df = add_qc_flags(df, flow_col, interpolated_mask, negative_mask, flatline_mask,outlier_mask)  

    return df


def enforce_regular_timestep(df, freq):

    """
    Enforce regular time intervals in the data.
    """
    df=df.copy()
    df['DateTime'] = pd.to_datetime(df['DateTime'], errors='coerce')

    # Drop rows with invalid DateTime
    bad_dt = df['DateTime'].isna().sum()
    if bad_dt > 0:
        print(f"⚠️ Dropping {bad_dt} rows with invalid DateTime")
        df = df.dropna(subset=['DateTime'])


    # Set DateTime as index
    df = df.set_index('DateTime')
    
    # Create regular frequency
    original_len = len(df)

    # Separate numeric and non-numeric columns
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    non_numeric_cols = df.select_dtypes(exclude=[np.number]).columns

    # Resample numeric columns (take mean)
    df_numeric = df[numeric_cols].resample(freq, label='left', closed='left').mean()
    
    # Resample non-numeric columns (take first value)
    df_non_numeric = df[non_numeric_cols].resample(freq, label='left', closed='left').first()
    
    # Combine back together
    df = pd.concat([df_numeric, df_non_numeric], axis=1)
    
    for col in ['Meter', 'Source_File']:
        if col in df.columns:
            df[col] = df[col].ffill().bfill()

    new_len = len(df)
    
    added = new_len - original_len
    if added > 0:
        print(f"  Added {added} timestamps to regularize frequency")
    
    # Reset index (DateTime becomes column again)
    df = df.reset_index()
    
    return df

def remove_negative_flows(df,flow_col,threshold,verbose=False):
    """
    Remove negative or physically impossible flow values.
    """
    df = df.copy()
    
    # Detect negative flows
    negative_mask = df[flow_col] < threshold
    negative_count = negative_mask.sum()
    
    if negative_count > 0:
        df.loc[negative_mask, flow_col] = np.nan
    
    return df, negative_mask


def remove_flatlines(df, flow_col, window=48):

    """
    Remove flatline periods (constant values).
    """

    df = df.copy()
    
    # Detect flatlines (zero standard deviation)
    flatlines = df[flow_col].rolling(window).std() == 0
    flat_count = flatlines.sum()
    
    df.loc[flatlines, flow_col] = np.nan
    
    return df, flatlines


def interpolate_gaps(df, flow_col, interp_limit):

    """
    Interpolate short gaps in data.
    """

    df = df.copy()
    
    
    # Track which values were NaN before interpolation
    was_nan = df[flow_col].isna()
    
    # Find gap sizes (in terms of consecutive NaNs)
    # Create groups of consecutive NaNs
    nan_groups = (was_nan != was_nan.shift()).cumsum()
    gap_sizes = was_nan.groupby(nan_groups).transform('sum')

    # Only interpolate small gaps
    should_interpolate = was_nan & (gap_sizes <= 4)

    # Find gap sizes (in terms of consecutive NaNs)
    # Create groups of consecutive NaNs
    nan_groups = (was_nan != was_nan.shift()).cumsum()
    gap_sizes = was_nan.groupby(nan_groups).transform('sum')

    # Only interpolate small gaps
    should_interpolate = was_nan & (gap_sizes <= 4)

    # Temporarily mark large gaps so they won't be interpolated
    df.loc[~should_interpolate & was_nan, flow_col] = -999999

    # Linear interpolation for small gaps only
    df[flow_col] = df[flow_col].interpolate(
        method='linear',
        limit=interp_limit,
        limit_direction='both'
    )

    # Restore large gaps as NaN
    df.loc[df[flow_col] == -999999, flow_col] = np.nan

    # Identify which NaNs were filled
    interpolated_mask = should_interpolate & df[flow_col].notna()
        
    return df, interpolated_mask


def remove_low_outliers(df, flow_col, window=14, threshold_multiplier=3,verbose=False):
    """
    Remove low outliers based on daily minimums using robust MAD detection.
    """

    df = df.copy()

    # Extract date from DateTime
    df['Date'] = pd.to_datetime(df['DateTime']).dt.date
    
    # Calculate daily minimum for each date
    daily_min = (
        df.groupby('Date')[flow_col]
        .min()
        .reset_index()
        .rename(columns={flow_col: 'min_flow'})
    )
    
    flow = daily_min['min_flow']

    # Rolling baseline (median is robust to outliers)
    rolling_median = flow.rolling(window, center=True).median()

    # difference from local baseline
    deviation = flow - rolling_median

    # estimate spread using MAD (robust)
    mad = deviation.abs().rolling(window, center=True).median()

    threshold = -threshold_multiplier * 1.4826 * mad
    
    # Identify negative spikes (days with suspiciously low minimums)
    neg_spikes = deviation < threshold
    outlier_count = neg_spikes.sum()

    # Mark those days' min_flow as NaN
    daily_min.loc[neg_spikes, 'min_flow'] = np.nan
    
    # Get the dates that were flagged as outliers
    flagged_dates = daily_min.loc[neg_spikes, 'Date'].values
    
    # Create mask for all records from flagged days
    outlier_mask = df['Date'].isin(flagged_dates)
    outlier_record_count = outlier_mask.sum()
    
    if outlier_record_count > 0:
        df.loc[outlier_mask, flow_col] = np.nan
    
    # Clean up temporary column
    df = df.drop('Date', axis=1)
    
    return df, outlier_mask


def add_qc_flags(df, flow_col,interpolated_mask, negative_mask, flatline_mask,outlier_mask):

    """
    Add quality control flags.    
    """

    df = df.copy()
    
    df['QC_flag'] = 'OK'

    # Mark interpolated values
    df.loc[interpolated_mask, 'QC_flag'] = 'INTERPOLATED'

    # Mark low outliers values
    df.loc[outlier_mask, 'QC_flag'] = 'LOW_OUTLIER'

    # Mark negative values
    df.loc[negative_mask, 'QC_flag'] = 'NEGATIVE'

    # Mark removed flatlines (may overwrite INTERPOLATED if both)
    df.loc[flatline_mask, 'QC_flag'] = 'FLATLINE_REMOVED'
    
    # Mark missing data (highest priority - overwrites everything)
    df.loc[df[flow_col].isna(), 'QC_flag'] = 'MISSING' 

    return df




        


In [12]:
df_test=clean_sewer_timeseries(
    og,
    flow_col='Flow_MGD',
    freq='15min',
    interp_limit=4,
)

df_test.head()

Cleaning meter: CBO with 315793 records
Cleaning meter: DBO with 315612 records
Cleaning meter: ENOR with 315610 records
Cleaning meter: FAO with 315612 records
Cleaning meter: GC1 with 315613 records
Cleaning meter: HRO with 315610 records
Cleaning meter: HVO with 315612 records
Cleaning meter: LCO with 315724 records
Cleaning meter: MCO with 315613 records
Cleaning meter: NC2R-2A with 315574 records
Cleaning meter: NH1 with 315613 records
Cleaning meter: NH2 with 315612 records
Cleaning meter: RMO with 315586 records
Cleaning meter: TF2 with 313301 records
Cleaning meter: TF5 with 315639 records


Unnamed: 0,DateTime,Flow_MGD,Meter,Source_File,QC_flag
0,2023-01-01 12:00:00,1.496,CBO,DURHAM_CBO_20230101-20260101.csv,OK
1,2023-01-01 12:15:00,1.505,CBO,DURHAM_CBO_20230101-20260101.csv,OK
2,2023-01-01 12:30:00,1.516,CBO,DURHAM_CBO_20230101-20260101.csv,OK
3,2023-01-01 12:45:00,1.547,CBO,DURHAM_CBO_20230101-20260101.csv,OK
4,2023-01-01 13:00:00,1.516,CBO,DURHAM_CBO_20230101-20260101.csv,OK


array(['CBO', None, 'DBO', 'ENOR', 'FAO', 'GC1', 'HRO', 'HVO', 'LCO',
       'MCO', 'NC2R-2A', 'NH1', 'NH2', 'RMO', 'TF2', 'TF5'], dtype=object)

In [47]:
# Filter for one meter
meter_name = 'CBO'
df = clean[clean['Meter'] == meter_name].copy()

# Filter date range: 8/1/2023 to 8/5/2023
start_date = '2023-11-16'
end_date = '2023-11-20'

df_filtered = df[(df['DateTime'] >= start_date) & (df['DateTime'] <= end_date)]

df_filtered


Unnamed: 0,DateTime,Flow_MGD,Meter,Source_File,QC_flag
30576,2023-11-16 00:00:00,1.223949,CBO,DURHAM_CBO_20230101-20260101.csv,OK
30577,2023-11-16 00:15:00,1.180721,CBO,DURHAM_CBO_20230101-20260101.csv,OK
30578,2023-11-16 00:30:00,1.104227,CBO,DURHAM_CBO_20230101-20260101.csv,OK
30579,2023-11-16 00:45:00,1.108747,CBO,DURHAM_CBO_20230101-20260101.csv,OK
30580,2023-11-16 01:00:00,1.112084,CBO,DURHAM_CBO_20230101-20260101.csv,OK
...,...,...,...,...,...
30955,2023-11-19 22:45:00,,CBO,DURHAM_CBO_20230101-20260101.csv,MISSING
30956,2023-11-19 23:00:00,,CBO,DURHAM_CBO_20230101-20260101.csv,MISSING
30957,2023-11-19 23:15:00,,CBO,DURHAM_CBO_20230101-20260101.csv,MISSING
30958,2023-11-19 23:30:00,,CBO,DURHAM_CBO_20230101-20260101.csv,MISSING


In [57]:
import plotly.graph_objects as go
import pandas as pd

meter_name = 'GC1'

#array(['CBO', nan, 'DBO', 'ENOR', 'FAO', 'GC1', 'HRO', 'HVO', 'LCO',
   #    'MCO', 'NC2R-2A', 'NH1', 'NH2', 'RMO', 'TF2', 'TF5']
df = clean[clean['Meter'] == meter_name].copy()
df['DateTime'] = pd.to_datetime(df['DateTime'])

fig = go.Figure()

# Add main data line
fig.add_trace(go.Scatter(
    x=df['DateTime'],
    y=df['Flow_MGD'],
    mode='lines',
    name='Flow Data',
    line=dict(color='steelblue', width=1.5),
    hovertemplate='<b>DateTime:</b> %{x}<br><b>Flow:</b> %{y:.3f} MGD<extra></extra>'
))

# Add shaded regions for non-OK periods
non_ok = df[df['QC_flag'] != 'OK'].copy()

if len(non_ok) > 0:
    # Group consecutive non-OK periods
    non_ok['group'] = (non_ok['DateTime'].diff() > pd.Timedelta('15min')).cumsum()
    
    qc_colors = {
        'INTERPOLATED': 'rgba(255, 165, 0, 0.2)',
        'MISSING': 'rgba(255, 0, 0, 0.2)',
        'FLATLINE_REMOVED': 'rgba(128, 0, 128, 0.2)',
        'NEGATIVE': 'rgba(139, 69, 19, 0.2)'
    }
    
    for qc_flag, color in qc_colors.items():
        flag_groups = non_ok[non_ok['QC_flag'] == qc_flag].groupby('group')
        
        for _, group in flag_groups:
            if len(group) > 0:
                fig.add_vrect(
                    x0=group['DateTime'].min(),
                    x1=group['DateTime'].max(),
                    fillcolor=color,
                    layer="below",
                    line_width=0,
                    annotation_text=qc_flag if len(group) > 10 else "",
                    annotation_position="top left"
                )

fig.update_layout(
    title=f'Flow Data for {meter_name} Meter - with QC Regions',
    xaxis_title='DateTime',
    yaxis_title='Flow (MGD)',
    hovermode='x unified',
    height=600
)

fig.show()

In [59]:
from pathlib import Path


In [60]:
# src/rdii/plots.py
"""Module for creating visualizations of flow data and QC flags."""

import matplotlib.pyplot as plt
import pandas as pd
from pathlib import Path


def plot_meter_qc(
    df,
    meter_name,
    output_dir='results/plots',
    figsize=(14, 6),
    dpi=300
):
    """
    Create a plot of flow data with QC issues highlighted.
    
    Parameters
    ----------
    df : pd.DataFrame
        Cleaned flow data with QC_flag column
    meter_name : str
        Name of the meter to plot
    output_dir : str
        Directory to save plots (default: 'results/plots')
    figsize : tuple
        Figure size in inches (default: (14, 6))
    dpi : int
        Resolution for saved figure (default: 300)
    
    Returns
    -------
    Path
        Path to saved figure
    """
    
    # Create output directory
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Filter for specific meter
    meter_df = df[df['Meter'] == meter_name].copy()
    meter_df['DateTime'] = pd.to_datetime(meter_df['DateTime'])
    meter_df = meter_df.sort_values('DateTime')
    
    if len(meter_df) == 0:
        print(f"Warning: No data found for meter {meter_name}")
        return None
    
    # Create figure
    fig, ax = plt.subplots(figsize=figsize)
    
    # Plot main data line
    ax.plot(meter_df['DateTime'], meter_df['Flow_MGD'], 
            color='steelblue', linewidth=1.5, label='Flow Data', zorder=3)
    
    # Add shaded regions for non-OK periods
    non_ok = meter_df[meter_df['QC_flag'] != 'OK'].copy()
    
    if len(non_ok) > 0:
        # Group consecutive non-OK periods
        non_ok['group'] = (non_ok['DateTime'].diff() > pd.Timedelta('15min')).cumsum()
        
        qc_colors = {
            'INTERPOLATED': 'orange',
            'MISSING': 'red',
            'FLATLINE_REMOVED': 'purple',
            'NEGATIVE': 'brown'
        }
        
        for qc_flag, color in qc_colors.items():
            flag_groups = non_ok[non_ok['QC_flag'] == qc_flag].groupby('group')
            
            first_of_type = True
            for _, group in flag_groups:
                if len(group) > 0:
                    # Add label only for first occurrence of each type
                    label = qc_flag if first_of_type else None
                    first_of_type = False
                    
                    ax.axvspan(
                        group['DateTime'].min(),
                        group['DateTime'].max(),
                        color=color,
                        alpha=0.2,
                        label=label,
                        zorder=1
                    )
    
    # Formatting
    ax.set_xlabel('DateTime', fontsize=12)
    ax.set_ylabel('Flow (MGD)', fontsize=12)
    ax.set_title(f'Flow Data for {meter_name} Meter - QC Analysis', fontsize=14, fontweight='bold')
    ax.legend(loc='best', framealpha=0.9)
    ax.grid(True, alpha=0.3, zorder=0)
    plt.xticks(rotation=45)
    plt.tight_layout()
    
    # Save figure
    output_file = output_path / f'{meter_name}_qc_plot.png'
    plt.savefig(output_file, dpi=dpi, bbox_inches='tight')
    plt.close()
    
    print(f"✓ Saved plot to {output_file}")
    
    return output_file

In [61]:
plot_meter_qc(clean, 'GC1')

✓ Saved plot to results\plots\GC1_qc_plot.png


WindowsPath('results/plots/GC1_qc_plot.png')