In [None]:
from typing import Literal

import matplotlib.pyplot as plt
from pandas import DataFrame, Series, concat as pd_concat
from pandera import DataFrameModel, Field

from bat.sensors import sensor_array

In [None]:
maxt = 1_000
#maxt = 100
sensors = sensor_array()
raw_data = [next(sensors) for _ in range(maxt*3)]

raw_data

In [None]:
from pandera.typing import Index, DataFrame as dft

# === Extract === #

raw_data_columns_type = Literal['time', 'channel', 'value']
raw_data_channel_type = Literal['anomaly', 'blue', 'red']
raw_data_values_type = bool | int

raw_data_type = list[tuple[int, raw_data_channel_type, raw_data_values_type]]


class RawDataDF(DataFrameModel):  # Extract
    time: int = Field(
        title='time',
        description='time-stamp in integer format',
        ge=0,
    )
    channel: str = Field(
        title='channel',
        description='designates the channel this value came from',
        isin=('red', 'blue', 'anomaly'),
    )
    value: int = Field(
        title='value',
        description='the value returned from the sensor channel',
    )
    
    class Config:
        coerce = True


class SensorData:
    
    def __init__(self, raw_data: raw_data_type):
        self.raw_data = raw_data
    
    @property
    def dataframe(self) -> RawDataDF:
        return RawDataDF(
            DataFrame(self.raw_data, columns=['time', 'channel', 'value'])
        )
    

sd = SensorData(raw_data=raw_data)

sd.dataframe

In [None]:
# === Transform === #

def normalized_sensor_field() -> Field:
    return Field(ge=0, le=1)

def normalized_anomaly_field() -> Field:
    return Field(isin=(0.0, 1.0))


class NormalizedAnomalyDF(DataFrameModel):
    anomaly: float = normalized_anomaly_field()

class NormalizedBlueDF(DataFrameModel):
    blue: float = normalized_sensor_field()

class NormalizedRedDF(DataFrameModel):
    blue: float = normalized_sensor_field()
    

class NormalizedChannelsDF(
    NormalizedAnomalyDF, 
    NormalizedBlueDF, 
    NormalizedRedDF, 
    DataFrameModel,
):
    index: Index[int] = Field(ge=1)
    
    
class NormalizedData:
    
    def __init__(self, raw_data_df: dft[RawDataDF]):
        self.raw_data_df = raw_data_df
        
    @property
    def dataframe(self) -> NormalizedChannelsDF:
        df = pd_concat(
            [
                self.normalized_anomaly, 
                self.normalized_blue, 
                self.normalized_red,
            ], 
            axis=1
        )
        return NormalizedChannelsDF(df.dropna())
    
    @property
    def normalized_anomaly(self) -> NormalizedAnomalyDF:
        if 'anomaly' in self.data_by_channel.columns:
            return (
                self.data_by_channel['anomaly']
                .astype(float).replace(True, 1).fillna(0)
            )
        else:  # no anomaly column data provided
            return NormalizedAnomalyDF(
                Series(0.0, index=self.data_by_channel.index, name='anomaly')
                .to_frame()
            )

    
    @property
    def normalized_blue(self) -> NormalizedBlueDF:
        # Normalize the blue channel to fill missing values
        return self.data_by_channel['blue'].astype(float) / 1000

    @property
    def normalized_red(self) -> NormalizedRedDF:
        return self.data_by_channel['red'].astype(float) / 100
        
    @property
    def data_by_channel(self) -> DataFrame:
        return self.raw_data_df.pivot(
            index='time', columns='channel', values='value'
        )

    
normalized_data = NormalizedData(raw_data_df=sd.dataframe)
normalized_data.dataframe

In [None]:
from pandera.typing import DataFrame as dft


class LaggedDatasetDF(DataFrameModel):
    anomaly: float = normalized_anomaly_field()
    blue: float = normalized_sensor_field()
    red: float = normalized_sensor_field()
    anomaly_t_1: float = normalized_anomaly_field()
    blue_t_1: float = normalized_sensor_field()
    red_t_1: float = normalized_sensor_field()
    anomaly_t_2: float = normalized_anomaly_field()
    blue_t_2: float = normalized_sensor_field()
    red_t_2: float = normalized_sensor_field()
    anomaly_t_3: float = normalized_anomaly_field()
    blue_t_3: float = normalized_sensor_field()
    red_t_3: float = normalized_sensor_field()
    
    class Config:
        strict = True
        coerce = True


class LaggedData:  # Transform
    
    def __init__(self, df: dft[NormalizedChannelsDF]):
        self.data = df
        self.n_in = 3
        self.n_out = 1
        
    @property
    def dataframe(self) -> LaggedDatasetDF:
        return LaggedDatasetDF(
            pd_concat((self.data, self.input_columns), axis=1).dropna()
        )
    
    @property
    def input_columns(self) -> LaggedDatasetDF:
        # input sequence (t-n, ... t-1)
        cols = pd_concat(
            [
                self.data.shift(i) 
                for i in range(self.n_in, 0, -1)
            ], 
            axis=1
        )
        cols.columns = self.input_column_names
        return cols.dropna()
        
    @property
    def input_column_names(self) -> list[str]:
        return [
            f'{col}_t_{i}' 
            for i in range(self.n_in, 0, -1)
            for col in self.data.columns
        ]
    
    
ld = LaggedData(df=normalized_data.dataframe)
print(f'{ld.input_columns=}')
ld.dataframe

In [None]:
class Preprocessor:
    
    def __init__(self, raw_data: raw_data_type):
        self.raw_data = raw_data
        
    @property
    def lagged_df(self) -> LaggedDatasetDF:
        return self.lagged_data.dataframe
    
    @property
    def lagged_data(self) -> LaggedData:
        return LaggedData(self.normalized_df)
    
    @property
    def normalized_df(self) -> NormalizedChannelsDF:
        return self.normalized_data.dataframe
    
    @property
    def normalized_data(self) -> NormalizedData:
        return NormalizedData(self.sensor_df)
        
    @property
    def sensor_df(self) -> RawDataDF:
        return self.sensor_data.dataframe
    
    @property
    def sensor_data(self) -> SensorData:
        return SensorData(self.raw_data)


prep = Preprocessor(raw_data=raw_data)
prep.lagged_df


In [None]:
def graph_raw_data(df: RawDataDF):
    df.columns = ['time', 'channel', 'value']
    plt.figure(figsize=(12, 10))
    channels = df[df['channel'].isin(['red', 'blue'])][
        'channel'].unique()  # getting unique channels excluding 'anomaly'
    
    for channel in channels:
        channel_data = df[df['channel'] == channel]
        plt.plot(channel_data['time'], channel_data['value'], label=channel)
    
    anomalies = df[df['channel'] == 'anomaly']
    for i in range(len(anomalies)):
        plt.fill_between(
            [anomalies.iloc[i, 0] - 1, anomalies.iloc[i, 0]],
            [0, 0],
            [df['value'].max(), df['value'].max()],
            color='gray',
            alpha=0.5
            )
    
    plt.xlabel('Time')
    plt.ylabel('Sensor Value')
    plt.title('Sensor Data Over Time')
    plt.legend()  # add a legend for each channel
    plt.show()


# Too slow
graph_raw_data(sd.dataframe)

In [None]:
data.head()

In [None]:
from pandera.typing import Index

def normalized_sensor_field() -> Field:
    return Field(ge=0, le=1)

def normalized_anomaly_field() -> Field:
    return Field(isin=(0.0, 1.0))



class NormalizedChannelsDF(DataFrameModel):
    index: Index[int] = Field(ge=1)
    anomaly: float = normalized_anomaly_field()
    blue: float = normalized_sensor_field()
    red: float = normalized_sensor_field()
    


def preprocess(input_df: RawDataDF) -> NormalizedChannelsDF:
    # remove rows with Negative values
    input_df.drop(input_df[input_df['value'] < 0].index, inplace=True)
    
    d2 = input_df.pivot(index='time', columns='channel', values='value')
    # cleanup pivoted dataframe
    # Add anomaly channel if it is missing
    
    # Convert Anomaly channel to binary float
    if 'anomaly' in d2.columns:
        d2['anomaly'] = d2['anomaly'].astype(float).replace(True, 1).fillna(0)
    else:
        d2['anomaly'] = 0.0
    
    # Normalize and interpolate the blue channel to fill missing values
    d2['blue'] = d2['blue'].astype(float)
    d2['blue'] /= 1000
    d2['blue'] = d2['blue'].interpolate(method='linear', limit_direction='forward')
    
    # Normalize red channel... figure out missing values
    d2['red'] = d2['red'].astype(float)
    if d2['red'].any() < 0:
        print(f'low {d2["red"]=}')
    d2['red'] /= 100
    if d2['red'].any() < 0:
        print(f'normalized low {d2["red"]=}')
        
    # Drop NaN values
    d2.dropna(inplace=True)    
    
    # Type the pivoted data
    d2.index = d2.index.astype('int32')
    
    
    return NormalizedChannelsDF(d2)


d2 = preprocess(data)
d2

In [None]:
# BUG REPORT
# Fails
GenericField: float = Field(ge=0)

class BadModelDF(DataFrameModel):
    field: float = GenericField
    field_1: float = GenericField
    
    class Config:
        strict = True

# Works
def generic_field() -> Field:
    return Field(ge=0)

class GoodModelDF(DataFrameModel):
    field: float = generic_field()
    field_1: float = generic_field()
    
    class Config:
        strict = True
        
df = DataFrame({'field': [0.0, 0.1], 'field_1': [0.2, 0.3]})

print(GoodModelDF(df))

BadModelDF.validate(df)

In [None]:
def create_lagged_dataset(df: dft[NormalizedChannelsDF]) -> LaggedDatasetDF:
    """
    Create a lagged Dataset.

    Parameters:
    df - your dataframe
    n_in - your input sequence length
    n_out - your output sequence length
    dropnan - to drop or not to drop NaN values from the DataFrame
    """
    n_in = 3
    n_out = 1
    cols, names = list(), list()
    
    
    # input sequence (t-n, ... t-1)
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names += [f'{col}_t_{i}' for col in df.columns]
    
    # forecast sequence (t, t+1, ... t+n)  # Unused
    for i in range(0, n_out):
        cols.append(df.shift(-i))
        names += [f'{col}{f"_t{i}" if i else ""}' for col in df.columns]

    # put it all together
    agg = concat(cols, axis=1)
    agg.columns = names
    agg = agg[[
        "anomaly_t_3", "blue_t_3", "red_t_3", 
        "anomaly_t_2", "blue_t_2", "red_t_2",
        "anomaly_t_1", "blue_t_1", "red_t_1",
        "anomaly", "blue", "red",
    ]]
    # drop rows with NaN values
    agg.dropna(inplace=True)

    return LaggedDatasetDF(agg)


d2_lagged = create_lagged_dataset(d2)
d2_lagged

In [None]:
# convert channels to columns
from sklearn.model_selection import train_test_split


# Declare features/predictors and target variable
X = d2_lagged.drop('anomaly', axis=1)
y = d2_lagged['anomaly']

# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=123
    )

import numpy as np


# Replacing infinities with NaN
#X_train = X_train.replace([np.inf, -np.inf], np.nan)
#X_test = X_test.replace([np.inf, -np.inf], np.nan)


In [None]:
from sklearn.ensemble import RandomForestClassifier


# Create model 
model = RandomForestClassifier(n_estimators=100, random_state=123)

# Train model
model.fit(X_train, y_train)

In [None]:
# Predictions
y_pred = model.predict(X_test)
y_pred

In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_test, y_pred))

In [None]:
newdata = [next(sensors) for _ in range(30)]

In [None]:
newdata.append(next(sensors))

newdata

In [None]:
nd = RawDataDF(DataFrame(newdata, columns=['time', 'channel', 'value']))
#nd.columns = ['time', 'channel', 'value']
print(f'{nd=}')

nd2 = preprocess(nd)
print(nd2)
nd2.plot()

In [None]:
DataFrame(newdata, columns=['time', 'channel', 'value'])

In [None]:
DataFrame(newdata, columns=['time', 'channel', 'value'])

In [None]:
nd2_lagged = create_lagged_dataset(nd2, n_in=3, n_out=1)
nd2_lagged

In [None]:
'''
def create_lagged_dataset(df, n_in=3, n_out=1, dropnan=True):
    """
    Create a lagged Dataset.

    Parameters:
    df - your dataframe
    n_in - your input sequence length
    n_out - your output sequence length
    dropnan - to drop or not to drop NaN values from the DataFrame
    """
    n_vars = df.shape[1]
    cols, names = list(), list()
    # input sequence (t-n, ... t-1)
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names += [f'{col}(t-{i})' for col in df.columns]
    
    print(f'\n{cols=}, {names=}')
    # forecast sequence (t, t+1, ... t+n)  # Unused
    for i in range(0, n_out):
        cols.append(df.shift(-i))
        names += [f'{col}({f"t-{i}" if i else "t"})' for col in df.columns]

    print(f'\n{cols=}, {names=}')
    # put it all together
    agg = concat(cols, axis=1)
    agg.columns = names
    # drop rows with NaN values
    if dropnan:
        agg.dropna(inplace=True)
    return agg
'''

d2_lagged = create_lagged_dataset(nd2, n_in=3, n_out=1)
d2_lagged

In [None]:
def predict(df: LaggedDatasetDF) -> DataFrame:
    X = df.drop('anomaly', axis=1)
    y_pred = model.predict(X=X)
    print(f'{y_pred}')
    
    lag = 3
    predictions_df = DataFrame(y_pred, columns=['anomaly_predicted'])
    predictions_df.index = X.index
    return predictions_df

predictions_df = predict(nd2_lagged)
predictions_df.head()

In [None]:
def plot_result(ax, X, Y_pred):
    ax.clear()
    # Plot the ground truth
    ax.plot(
        X.index,
        X['blue'].values,
        label='blue',
        color='b'
        )
    ax.plot(
        X.index,
        X['red'].values,
        label='red',
        color='r',
    )
    ax.plot(
        X.index,
        X['anomaly'],
        label='anomaly(actual)',
        color='y',
    )
    ax.plot(
        Y_pred.index,
        Y_pred['anomaly_predicted'],
        label='Anomaly Predicted',
        color='g',
    )
    # Visual settings of the graph
    ax.legend()
    ax.grid(True)
    ax.set_title('Ground Truth vs Prediction')
    ax.set_xlabel('Timestamp')
    ax.set_ylabel('Anomaly Detection')
    plt.show()


figure, ax = plt.subplots()
plot_result(ax, nd2, predictions_df)

In [None]:
ax.clear()

In [None]:
# Main Loop
newdata.append(next(sensors))


In [None]:
def graph_prediction(newdata: DataFrame, ax):
    nd = DataFrame(newdata)
    nd.columns = ['time', 'channel', 'value']
    #print(f'{nd=}')
    
    nd2 = preprocess(nd)
    #print(nd2)
    #nd2.plot()
    nd3 = create_lagged_dataset(nd2)
    X = nd3.drop('anomaly', axis=1)
    y_pred = model.predict(X=X)
    print(f'{y_pred}')
    combine_df = nd3.copy()
    combine_df['anomaly_predicted'] = y_pred 
    
    lag = 3
    predictions_df = DataFrame(y_pred, columns=['anomaly_predicted'])
    print(f'{predictions_df.head()=}')
    predictions_df.index += X.index
    print(f'{predictions_df.head()=}')
    
    #plot_result(ax, nd2, predictions_df)
    plot_result(ax, nd2, predictions_df)

    
figure, ax = plt.subplots()
graph_prediction(newdata, ax)

In [None]:
from time import sleep

for _ in range(100):
    newdata.append(next(sensors))
    graph_prediction(newdata, ax)
    sleep(1)

In [None]:
newdata

In [None]:
nd = DataFrame(newdata)
nd.columns = ['time', 'channel', 'value']
#print(f'{nd=}')

nd2 = preprocess(nd)
#print(nd2)
nd2.plot()