In [80]:
import numpy as np
import re
import matplotlib.pyplot as plt
import polars as pl
import seaborn as sns
from enum import Enum

In [82]:
# Setup and global variables
# --------------------------
# Set the path to the data directory
fig_dir = "img/"
# Plotting setup and style
custom_cmap = plt.get_cmap("viridis")
sns.set_style("darkgrid")
# Relative data paths
output_energy = "output-energy" # Energy measurement output file
output_timing = "output-timing" # Timing measurement output file
s_hardware = "-hardware"
s_software = "-software"
s_full = "-full"

class OutputType(Enum):
    """Enum for the different output types"""
    ENERGY = 1
    TIMING = 2

In [88]:
# Constants
class Measurements:
    def __init__(self, outputs: list[str], output_types: list[OutputType]) -> None:
        for output, output_type in zip(outputs, output_types):
            match output_type:
                case OutputType.ENERGY:
                    # extract energy values
                    summaries = self.extract_summaries(output)

                    sensor_values = [self.extract_values(row) for row in summaries["Sensor energy"]]
                    aes_values = [self.extract_values(row) for row in summaries["AES energy"]]
                    tx_values = [self.extract_values(row) for row in summaries["TX energy"]]

                    self._df_energy_sensor = pl.DataFrame(sensor_values)
                    self._df_energy_aes = pl.DataFrame(aes_values)
                    self._df_energy_tx = pl.DataFrame(tx_values)

                case OutputType.TIMING:
                    # extract timing values
                    self._df_timings = self.extract_timings(output)
                case _:
                    raise ValueError("Invalid output type")

    States = {
        "CPU": 1.8, # mA as per https://www.willow.co.uk/TelosB_Datasheet.pdf
        "LPM": 0.0051 + 0.021, # 26.1 uA as per https://www.willow.co.uk/TelosB_Datasheet.pdf
        "DEEP_LPM": 0.0061, # 6.1 uA as per https://www.willow.co.uk/TelosB_Datasheet.pdf
        "RADIO_RX": 18.8, # https://www.ti.com/lit/ds/symlink/cc2420.pdf?ts=1670166938154&ref_url=https%253A%252F%252Fwww.ti.com%252Fproduct%252FCC2420
        "RADIO_TX": 17.4 # https://www.ti.com/lit/ds/symlink/cc2420.pdf?ts=1670166938154&ref_url=https%253A%252F%252Fwww.ti.com%252Fproduct%252FCC2420
    }
    VOLTAGE = 3
    TICKS_PER_SECOND = 32768

    """
    Takes a path to a file containing the timing measurements
    Returns a DataFrame with the timing measurements
    """
    def extract_timings(path: str) -> pl.DataFrame:
        """Extract the timings from the output timing file"""
        # Read the file
        with open(path, "r") as f:
            file = f.read()
        # Extract Sensor timings
        sensor_timings = re.findall(r"Sensor time: (\d+)", file)
        sensor_timings = [int(t) for t in sensor_timings]
        # Extract AES encryption timings
        aes_timings = re.findall(r"AES time: (\d+)", file)
        aes_timings = [int(t) for t in aes_timings]
        # Extract TX timings
        tx_timings = re.findall(r"TX time: (\d+)", file)
        tx_timings = [int(t) for t in tx_timings]

        # Create a DataFrame
        df = pl.DataFrame(
            {
                "Sensor": sensor_timings,
                "AES Encryption": aes_timings,
                "TX": tx_timings,
            }
        )
        return df

    """
    Takes a summary string and returns a list of values
    Returns a list of values
    """
    def extract_values(summary: str, samples=1) -> dict:
        # extract the values
        match = re.findall(r":\s+(\d+)", summary)
        # return the values
        return dict({
            state: int(x) for state, x in zip(["Total"] + list(Measurements.States.keys()), match)
        })

        # return [int(x) / samples if i > 0 else int(x) for i, x in enumerate(match)]

    """
    Takes a list of values and returns a dictionary with the state values
    Returns a dictionary with the state values
    """
    def calculate_state_energy(values: dict) -> dict:
        # calculate the values
        total_time = values["Total"]
        states_values = {
            f"{state}": {
                "state_time": state_time,
                "avg_current_mA": (state_time * state_current) / total_time,
                "charge_mC": (state_time * state_current) / Measurements.TICKS_PER_SECOND,
                "charge_mAh": (state_time * state_current) / Measurements.TICKS_PER_SECOND / 3600,
                "power_mW": (state_time * state_current) / total_time * Measurements.VOLTAGE,
                "energy_mJ": (state_time * state_current) / Measurements.TICKS_PER_SECOND * Measurements.VOLTAGE
            } for state_time, state_current, state in zip(values[1:], Measurements.States.values(), Measurements.States.keys())
        }
        # return the values
        return states_values

    """
    Takes a path to a file containing the energy measurements
    Returns a DataFrame with the energy measurements
    """
    def extract_summaries(energest_output: str) -> list:
        # open the file to apply regex on
        with open(energest_output, "r") as f:
            # read the file
            file = f.read()

        reg_suf = r".*?Radio total\s+:\s+\d+/\s+\d+ \(\d+ permil\)"

        # extract the sensor energy summaries
        sensor_summaries = re.findall(r"Sensor energy:" + reg_suf, file, re.DOTALL)
        # extract the AES energy summaries
        aes_summaries = re.findall(r"AES energy:" + reg_suf, file, re.DOTALL)
        # extract the TX energy summaries
        tx_summaries = re.findall(r"TX energy:" + reg_suf, file, re.DOTALL)

        # return the summaries
        return dict({
            "Sensor energy": sensor_summaries,
            "AES energy": aes_summaries,
            "TX energy": tx_summaries
        })

In [None]:
measurements = Measurements([output_timing + s_software + ".txt"], OutputType.ENERGY)

In [51]:
df = extract_timings(output_timing + s_software + ".txt")
len(df)

1056

In [69]:
list(Measurements.States.keys())

['CPU', 'LPM', 'DEEP_LPM', 'RADIO_RX', 'RADIO_TX']

In [86]:
df = Measurements.extract_summaries(output_energy + s_hardware + ".txt")
pl.DataFrame([Measurements.extract_values(row) for row in df["Sensor energy"]])

Total,CPU,LPM,DEEP_LPM,RADIO_RX,RADIO_TX
i64,i64,i64,i64,i64,i64
152,152,0,0,0,0
152,152,0,0,0,0
153,153,0,0,0,0
152,152,0,0,0,0
153,153,0,0,0,0
151,151,0,0,0,0
152,152,0,0,0,0
152,152,0,0,0,0
152,152,0,0,0,0
152,152,0,0,0,0
