In [1]:
import time
import pickle
import numpy as np
from datetime import datetime
from labels import label_datetimes
from scipy.signal import butter, filtfilt
from scipy.interpolate import UnivariateSpline

import plotly.express as px
import plotly.graph_objects as go

In [3]:
CUTOFF = 5
FS = 50
ORDER = 1


def apply_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter(order, cutoff, fs=fs, btype="low", analog=False)
    y = filtfilt(b, a, data)
    return y

In [4]:
def get_ts(date_str):
    ts = time.mktime(datetime.strptime(
        date_str, "%Y-%m-%d %H:%M:%S").timetuple())
    return int(ts)

In [5]:
def get_values_between_dates(matrix, sensor_idx, sensor_data, start_date, end_date):
    start_ts = get_ts(start_date)
    end_ts = get_ts(end_date)
    t = np.linspace(start_ts, end_ts, 100)
    result = {"t": t, "hps": [], "ot": [], "ohps": []}
    for hp in range(10):
        df = sensor_data[matrix][sensor_idx][hp]
        df_interval = df[(df["Real time clock"] >= start_ts - 100)
                         & (df["Real time clock"] <= end_ts + 100)]

        rtc = df_interval["Real time clock"].values
        gas = df_interval["Resistance Gassensor"].values

        result["ot"].append(rtc)
        result["ohps"].append(gas)

        interpolation_func = UnivariateSpline(rtc, gas, s=0)

        y = interpolation_func(t)
        result["hps"].append(y)

    return result

In [22]:
DAY = 9
matrix = "sample"
label = next((i for i in label_datetimes if i["day"] == DAY), None)

with open(f"data/day_{DAY}/raw.pkl", "rb") as f:
    raw_data = pickle.load(f)

for sensor in range(8):
    s_data = get_values_between_dates(
        matrix,
        sensor,
        raw_data,
        label["start"],
        label["end"])

    title = f"Sensor {sensor} Readings"
    fig = go.Figure()

    for hp in range(10):
        fig.add_trace(go.Scatter(x=s_data["t"],
                                 y=np.log(apply_lowpass_filter(
                                     s_data["hps"][hp], CUTOFF, FS, ORDER)),
                                 mode="lines",
                                 showlegend=True,
                                 name=f"I.H.S. {hp}"))

        fig.add_trace(go.Scatter(x=s_data["ot"][hp],
                                 y=np.log(s_data["ohps"][hp]),
                                 mode="lines",
                                 showlegend=True,
                                 name=f"O.H.S. {hp}"))

    fig.update_yaxes(title_text="Resistance (Ohms)")
    fig.update_xaxes(title_text="Timestamp")
    fig.update_layout(height=800, width=1200,
                      title_x=0.5,
                      font_family="Times New Roman",
                      title_font_family="Times New Roman",
                      title=dict(text=title, pad=dict(t=0, r=0, b=0, l=0)),
                      margin=dict(t=50, r=10, b=0, l=0))
    fig.show()