In [1]:
pip install numpy pandas

You should consider upgrading via the '/usr/local/opt/python@3.10/bin/python3.10 -m pip install --upgrade pip' command.[0m[33m
[0mNote: you may need to restart the kernel to use updated packages.


In [2]:
import numpy as np
import pandas as pd
import dateutil
import datetime
from filterpy.kalman import KalmanFilter

In [3]:
import json
with open("./sensor_data.json", "r") as f:
    data = json.load(f)

In [4]:
timestamps = [dateutil.parser.parse(d["timestamp"]) for d in data]
temperature = [d["payload"]["temperature"] if ("temperature" in d["payload"]) else None for d in data]
humidity = [d["payload"]["humidity"] if ("humidity" in d["payload"]) else None for d in data]

In [5]:
zipped = zip(timestamps, temperature, humidity)
df = pd.DataFrame(zipped, columns=["timestamp", "temperature", "humidity"])
df = df.loc[pd.notna(df["temperature"])]
df = df.sort_values(by="timestamp", ascending=True).reset_index(drop=True)

# Constant Data Predictor

In [6]:
class Const_Predictor:
    def __init__(self, x_0=0):
        self.x = x_0

    def predict(self):
        return self.x
    
    def update(self, z):
        self.x = z

    def multiple_update(self, zs):
        self.x = zs[-1]

    def clone(self):
        return Const_Predictor(self.x)

## Kalman Filter Prediction
A Kalman Filter is attempts to predict the true values behind a noisy data stream.

In [34]:
#x = predicted data
#q = predicted variance
#Q = system noise
#R = input noise

class Kalman_Predictor:
    def __init__(self, x_0, P_0, R=1., Q=1.):
        self.x = x_0
        self.P = P_0
        self.Q = Q
        self.R = R

    def predict(self):
        self.P = self.P + np.random.normal(0, self.Q)
        return self.x
    
    def update(self, z):
        KG = self.P / (self.P + self.R)
        self.x = self.x + (KG*(z - self.x))
        self.P = self.P - (KG*self.P)
        return self.x

    def multiple_update(self, zs):
        for z in zs:
            self.predict()
            self.update(z)
        return self.x

    def clone(self):
        return Kalman_Predictor(self.x, self.P, self.Q, self.R)

In [24]:
def prediction_func_accuracy(data, predictor, error_threshold = 0.05):

    predictor_a = predictor
    predictor_b = predictor.clone()

    actual_data_values = []

    predicted = 0
    sent = 1
    for z in data[1:]:
        actual_data_values.append(z)
        x = predictor_a.predict()
        error = abs(x - z)
        if error > error_threshold:
            predictor_b.multiple_update(actual_data_values)
            predictor_a = predictor_b.clone()
            sent += 1
        else:
            predictor_a.update(x)
            predicted += 1
    
    return predicted / (predicted + sent)

In [43]:
k_temp_accuracy = prediction_func_accuracy(df["temperature"], Kalman_Predictor(df["temperature"][0], P_0=1000, R=1, Q=0.1), error_threshold=0.05)
k_humid_accuracy = prediction_func_accuracy(df["humidity"], Kalman_Predictor(df["humidity"][0], P_0=1000, R=25., Q=0.1), error_threshold=0.5)

c_temp_accuracy = prediction_func_accuracy(df["temperature"], Const_Predictor(df["temperature"][0]), 0.05)
c_humid_accuracy = prediction_func_accuracy(df["humidity"], Const_Predictor(df["humidity"][0]), 0.5)

print("Accuracy for Kalman with temperature to 1dp =", k_temp_accuracy)
print("Accuracy for Constant with temperature to 1dp =", c_temp_accuracy)
print("Accuracy for Kalman humidity to 0dp =", k_humid_accuracy)
print("Accuracy for Constant humidity to 0dp =", c_humid_accuracy)

Accuracy for Kalman with temperature to 1dp = 0.8636003172085647
Accuracy for Constant with temperature to 1dp = 0.865979381443299
Accuracy for Kalman humidity to 0dp = 0.9373513084853291
Accuracy for Constant humidity to 0dp = 0.9635210150674068
