# Data exploration of data in timestream db


## Initialization


### Imports


In [43]:
import datetime
from dataclasses import dataclass, field
from typing import Optional

import awswrangler as wr
import pandas as pd
import numpy as np
from numpy.polynomial import Polynomial
import scipy.signal as signal
from bokeh import palettes
from bokeh.models import HoverTool
from bokeh.plotting import figure
from bokeh.io import output_notebook, show
from dotenv import load_dotenv
from scipy import stats


### Data retrieval


In [44]:
df: pd.DataFrame = wr.timestream.query('SELECT * FROM "when-to-water"."sensor-data"')
print(f"Retrived {len(df)} records")


Retrived 8084 records


### Constants, classes and functions


In [45]:
load_dotenv()
output_notebook()

SENSORS: tuple[str, ...] = ("PWS_1", "PWS_2", "PWS_3")


@dataclass(order=True, frozen=True)
class MeasurementsDataFrame:
    """Class to represent a dataframe of measurements from a sensor. Dataframe should consist of the following columns:"""

    """- offset (in seconds)"""
    """- value"""
    sort_index: int = field(init=False, repr=False)
    sensor_name: str
    measurements: pd.DataFrame
    start: datetime.datetime
    end: datetime.datetime = field(init=False)

    def __post_init__(self):
        object.__setattr__(self, "sort_index", self.start)
        object.__setattr__(
            self,
            "end",
            self.start
            + datetime.timedelta(seconds=int(self.measurements["offset"].max())),
        )


def df_to_mdf(sensor_name: str, df: pd.DataFrame) -> Optional[MeasurementsDataFrame]:
    """
    Convert a dataframe to a MeasurementsDataFrame
    """

    new_df = pd.DataFrame()

    mininmum_dt = df["time"].min()
    new_df["offset"] = (df["time"] - mininmum_dt).dt.total_seconds()
    new_df["value"] = df["soil moisture in %"]
    # remove outliers
    new_df = new_df[(np.abs(stats.zscore(new_df["value"])) < 2)]
    if new_df.empty:
        return None
    # normalize
    new_df["value"] = new_df["value"] + (100 - new_df["value"].max())
    new_df.reindex()

    # check if really descending
    if new_df.iloc[0]["value"] < new_df.iloc[-1]["value"]:
        return None

    return MeasurementsDataFrame(
        sensor_name=sensor_name,
        measurements=new_df,
        start=df["time"].min(),
    )


def pick_descends(
    sensor_name: str,
    df: pd.DataFrame,
) -> list[MeasurementsDataFrame]:
    """
    Remove ascends from the dataframe. Must have peaks and valleys as values
    """
    last_peak: int = -1
    last_valley: int = 0
    mdfs: list[MeasurementsDataFrame] = []
    for row in df.itertuples():
        if row.peak:
            last_peak = row.Index
        if row.valley:
            last_valley = row.Index
            if last_peak > -1 and last_peak < last_valley:
                mdf = df_to_mdf(sensor_name, df.iloc[last_peak:last_valley])
                if mdf is not None:
                    mdfs.append(mdf)
    return mdfs


### Basic transformations


In [46]:
df["time"] = pd.to_datetime(df["time"])
df.sort_values(by="time")
df.dropna(inplace=True)
df.reindex()
df_counts = (
    df.groupby(["sensor_name", "measure_name", df["time"].dt.floor("d")])
    .size()
    .reset_index(name="count")
)
df_counts = (
    df_counts.groupby(["sensor_name"])
    .median()
    .rename(columns={"count": "median records per day"})
)
MEDIANS = df_counts.to_dict()["median records per day"]
print(f"Medians: {MEDIANS}")


Medians: {'PWS_1': 9.0, 'PWS_2': 9.0, 'PWS_3': 9.0}


### Transformations


In [47]:
df_soil_moisture = df[df["measure_name"] == "soil_moisture"].rename(
    columns={"measure_value::double": "soil moisture in %"}
)
df_soil_moisture = df_soil_moisture[df_soil_moisture["soil moisture in %"] > 2]
moisture_dfs: dict[str, pd.DataFrame] = {}
for sensor_name in SENSORS:
    moisture_dfs[sensor_name] = df_soil_moisture[
        df_soil_moisture["sensor_name"] == sensor_name
    ].copy()
    moisture_dfs[sensor_name].reset_index(inplace=True)
    peaks = signal.find_peaks(
        moisture_dfs[sensor_name]["soil moisture in %"],
        distance=MEDIANS[sensor_name] / 2,
        prominence=3,
    )[0]
    valleys = signal.find_peaks(
        -moisture_dfs[sensor_name]["soil moisture in %"],
        distance=MEDIANS[sensor_name] / 2,
        prominence=3,
    )[0]
    moisture_dfs[sensor_name]["peak"] = moisture_dfs[sensor_name].index.isin(peaks)
    moisture_dfs[sensor_name]["valley"] = moisture_dfs[sensor_name].index.isin(valleys)
    moisture_dfs[sensor_name] = pick_descends(sensor_name, moisture_dfs[sensor_name])
moisture_dfs = dict(sorted(moisture_dfs.items()))
all_measurements: dict[str, pd.DataFrame] = {}
for sensor_name in SENSORS:
    all_measurements[sensor_name] = pd.concat(
        [mdf.measurements for mdf in moisture_dfs[sensor_name]]
    )
# offset seconds to hours
for measurement_df in all_measurements.values():
    measurement_df["offset"] = measurement_df["offset"] / 3600


## Regression


In [48]:
polyfits: dict = {}
for sensor_name in SENSORS:
    polyfits[sensor_name] = Polynomial.fit(
        all_measurements[sensor_name]["offset"],
        all_measurements[sensor_name]["value"],
        1,
    )


## Plots


In [49]:
plot = figure(width=1280, height=720)
plot.add_tools(HoverTool())

for index, sensor_name in enumerate(SENSORS):
    plot.circle(
        x="offset",
        y="value",
        source=all_measurements[sensor_name],
        color=palettes.Colorblind[len(SENSORS)][index],
    )
    poly_x, poly_y = polyfits[sensor_name].linspace(100)
    plot.line(
        x=poly_x,
        y=poly_y,
        color=palettes.Colorblind[len(SENSORS)][index],
        line_width=2,
        line_dash="dashed",
    )

show(plot)
