[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/wattwatchers/le_completeness_analysis/blob/main/le_completeness_analysis/analysis_colab.ipynb)

# Device id(s) configuration.

There are 2 options to configure devices for analysis.
1. All devices associated with the API key configured in the .env file
2. A list of device ids


In [1]:
# API key for the Public REST API
# If you want to analyse all devices associated with the API key, set the value of DEVICE_IDS below to an empty list ([])
API_KEY: str = ""

# If you want to analyse a subset of devices, enter the device ids inside the brackets ([]) like this:
# [ "DD1234567890", "DD2345678901"]
DEVICE_IDS: list = []

# Period configuration

Configure the time period to analyse.


In [24]:
TIMEZONE: str = "Australia/Sydney"         # The timezone the period is defined in
START_DATE: str = "2025-02-27"             # Date string in the format <YYYY-MM-DD> in the target timezone
END_DATE: str = "2025-03-25"               # Date string in the format <YYYY-MM-DD> in the target timezone

# Threshold configuration

Configure the data completeness threshold (as a percentage) under which a device is considered problematic

In [3]:
DATA_COMPLETENESS_THRESHOLD: float = 99

# Other config (you probably won't need to touch these)

In [4]:
MAX_TPS: int = 4                 # Maximum # of API requests per second
ENVIRONMENT: str = "production"  # API environment (staging or production)

# API client

In [None]:
import os
import logging
import re

import pandas as pd
%pip install pendulum
import pendulum
%pip install itables
from itables import show, JavascriptFunction, JavascriptCode

In [6]:
def get_logger(logging_level: str = "INFO") -> logging.Logger:
    logger = logging.getLogger("notebook")
    logger.setLevel(logging_level)

    # loggers are cached, so if we call this from multiple places we end up with multiple handlers
    if logger.hasHandlers():
        return logger

    stdout_handler = logging.StreamHandler()
    formatter: logging.Formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
    stdout_handler.setFormatter(formatter)
    logger.addHandler(stdout_handler)
    return logger

In [7]:
import time
from functools import partialmethod
from typing import Any

import httpx
from pendulum import DateTime

JSONType = None | bool | int | float | str | list[Any] | dict[str, Any]


class RestError(Exception):
    """
    Error from REST API Client.
    """

    def __init__(
        self,
        message: str,
        request: httpx.Request,
        response: httpx.Response | None = None,
    ):
        super().__init__(message)
        self.request: httpx.Request = request
        self.response: httpx.Response | None = response


class RestAPIClient:

    def __init__(self, base_url: str, requests_per_sec_max: int, **session_kwargs):
        self._base_url = base_url
        self._client = httpx.Client()
        self._requests_per_sec_max = requests_per_sec_max
        self._last_request_time: DateTime | None = None

        for key, value in session_kwargs.items():
            setattr(self._client, key, value)

    def __enter__(self):
        return self

    def __exit__(self, *_):
        self.close()

    def close(self):
        return self._client.close()

    def _throttler(self):
        """
        This method throttles API request based on when the last request was made and the number of maximum number of requests per second configured.
        (the actual frequency of requests can be lower than the maximum allowed if requests take longer to complete than the minimum interval
        between requests)
        """
        if self._last_request_time is None:
            return
        time_since_last_request = self._last_request_time.diff().in_seconds()
        wait_duration = max(0, 1 / self._requests_per_sec_max - time_since_last_request)
        if wait_duration > 0:
            time.sleep(wait_duration)

    def request(self, method: str, path: str, **kwargs) -> tuple[JSONType, RestError]:
        self._throttler()
        try:
            resp = self._client.request(method, f"{self._base_url}/{path}", **kwargs)
            resp.raise_for_status()
            if len(resp.text) == 0:
                return None, None
            return (resp.json(), None)
        except httpx.HTTPStatusError as http_error:
            error_message = http_error.response.json().get("message", "")
            error = RestError(
                f"Error response {http_error.response.status_code} while requesting {http_error.request.url!r}: {error_message}",
                http_error.request,
                http_error.response,
            )
            return (None, error)
        except httpx.RequestError as err:
            error = RestError(
                f"An error occurred while requesting {err.request.url!r}.", err.request
            )
            return (None, error)
        finally:
            self._last_request_time = pendulum.now()

    get = partialmethod(request, "GET")
    post = partialmethod(request, "POST")
    put = partialmethod(request, "PUT")
    patch = partialmethod(request, "PATCH")
    delete = partialmethod(request, "DELETE")
    head = partialmethod(request, "HEAD")
    options = partialmethod(request, "OPTIONS")


In [8]:
from dataclasses import dataclass
from enum import Enum
import json

@dataclass
class TimeInterval:
    """
    Data class for a time interval
    """

    timestamp_start: int
    timestamp_end: int


class Granularity(Enum):
    """
    Enum for different LE granularities
    """

    FIVE_MINS = "5m"
    FIFTEEN_MINS = "15m"
    THIRTY_MINS = "30m"
    HOUR = "hour"
    DAY = "day"
    WEEK = "week"
    MONTH = "month"


class CallerError(Exception):
    """
    Error by caller of the client.
    """


class PublicApiClient(RestAPIClient):

    def __init__(
        self,
        environment: str,
        api_key: str,
        requests_per_sec_max: int,
        logger: logging.Logger,
    ):
        match environment:
            case "production" | "prod":
                base_url = "https://api-v3.wattwatchers.com.au"
            case "staging":
                base_url = "https://api-v3-stage.wattwatchers.com.au"
            case _:
                # fallback is prod
                base_url = "https://api-v3.wattwatchers.com.au"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        # TODO: use a considered value for number of requests per second
        super().__init__(base_url, requests_per_sec_max, headers=headers)
        self._logger = logger

    def get_devices_list(self) -> tuple[list | None, RestError | None]:
        """
        Retrieves all device ids associated with the API key
        """
        result = super().get("devices")
        return result

    def get_device_status(self, device_id: str) -> tuple[dict | None, RestError | None]:
        """
        Retrieves the status of the device associated with the device_id
        """
        result = super().get(f"devices/{device_id}")
        return result

    def patch_device_status(
        self, device_id: str, payload: dict
    ) -> tuple[dict | None, RestError | None]:
        """
        Patches the device status of the device associated with the device_id
        Used (among other things) to update WiFi credentials
        """
        result = super().patch(f"devices/{device_id}", data=json.dumps(payload))
        return result

    def update_wifi_credentials(
        self, device_id: str, ssid: str | None = None, psk: str | None = None
    ) -> tuple[dict | None, RestError | None]:
        """
        Updates the WiFi credentials of the device associated with the device_id
        If successful, this will cause the device to switch to WiFi comms.
        """
        if ssid is None and psk is None:
            # No credential details provided, return error
            return (
                None,
                CallerError(
                    "Request to update WiFi credentials requires at least one of SSID and PSK to be defined."
                ),
            )

        payload = {"comms": {"wifi": {}}}
        if not (ssid is None):
            payload["comms"]["wifi"]["ssid"] = ssid
        if not (psk is None):
            payload["comms"]["wifi"]["psk"] = psk

        return self.patch_device_status(device_id, payload)

    def reset_wifi_credentials(
        self, device_id: str
    ) -> tuple[dict | None, RestError | None]:
        """
        Resets the WiFi credentials of the device associated with the device_id
        This will cause the device to switch to cellular comms.
        """
        return self.update_wifi_credentials(device_id, "", "")

    def change_switch_state(
        self, device_id: str, switch_id: str, target_state: str
    ) -> tuple[dict | None, RestError | None]:
        """
        Changes the switch state of the switch with id `switch_id` on the device with
        id `device_id` to state `target_state`.
        """
        payload = {
            "id": device_id,
            "switches": [{"id": switch_id, "state": target_state}],
        }
        return self.patch_device_status(device_id, payload)

    def update_se_reporting_interval(
        self, device_id: str, reporting_interval: int
    ) -> tuple[dict | None, RestError | None]:
        """
        Update the SE reporting interval for the device to the requested value
        """
        payload = {"shortEnergyReportingInterval": reporting_interval}
        return super().post(
            f"devices/{device_id}/reporting-interval", data=json.dumps(payload)
        )

    def get_latest_se(
        self, device_id: str, energy_unit: str | None = "kW"
    ) -> tuple[dict | None, RestError | None]:
        if energy_unit is not None and energy_unit in ["kW", "kWh"]:
            params = {"convert[energy]": energy_unit}
            return super().get(f"short-energy/{device_id}/latest", params=params)
        return super().get(f"short-energy/{device_id}/latest")

    def _max_interval_for_granularity(self, granularity: Granularity) -> int:
        """
        Returns the maximum interval for a single energy request based on the granularity
        """
        MAX_INTERVALS_DAYS = {
            Granularity.FIVE_MINS: 7,
            Granularity.FIFTEEN_MINS: 14,
            Granularity.THIRTY_MINS: 31,
            Granularity.HOUR: 90,
            Granularity.DAY: 3 * 365,  # ≈ 3 years
            Granularity.WEEK: 5 * 365,  # ≈ 5 years
            Granularity.MONTH: 10 * 365,  # ≈ 10 yers
        }
        return MAX_INTERVALS_DAYS.get(granularity, 7) * 24 * 3600

    def _calculate_intervals_for(
        self, granularity: Granularity, timestamp_start: int, timestamp_end: int
    ) -> list[tuple[int, int]]:
        """
        Batches an interval based on the maximum interval per request for the given granularity.
        """
        batch_interval = self._max_interval_for_granularity(granularity)
        intervals = [
            TimeInterval(batch_start, min(batch_start + batch_interval, timestamp_end))
            for batch_start in range(timestamp_start, timestamp_end, batch_interval)
        ]
        return intervals

    def _load_energy(
        self,
        endpoint: str,
        device_id: str,
        intervals: list[TimeInterval],
        unit: str = "kWh",
        granularity: Granularity | None = None,
    ) -> tuple[list | None, RestError | None]:

        energy_data = []
        for interval in intervals:
            params = {
                "fromTs": interval.timestamp_start,
                "toTs": interval.timestamp_end,
                "convert[energy]": unit,
            }
            if granularity is not None:
                params["granularity"] = granularity.value

            self._logger.info(
                f"load from {interval.timestamp_start} to {interval.timestamp_end} for {device_id}"
            )
            (result, error) = super().get(endpoint, params=params)
            if error is not None:
                self._logger.error(
                    f"Error retrieving LE data for {device_id} between {interval.timestamp_start} and {interval.timestamp_end}: {error}"
                )
                return (None, error)

            if result is not None:
                energy_data.extend(result)

        return (energy_data, None)

    def load_long_energy(
        self,
        device_id: str,
        timestamp_start: int,
        timestamp_end: int,
        granularity: Granularity = Granularity.FIVE_MINS,
        unit: str = "kWh",
    ) -> tuple[list | None, RestError | None]:

        intervals = self._calculate_intervals_for(
            granularity, timestamp_start, timestamp_end
        )
        return self._load_energy(
            f"long-energy/{device_id}", device_id, intervals, unit, granularity
        )

    def get_first_le(self, device_id: str) -> tuple[list | None, RestError | None]:
        result = super().get(f"long-energy/{device_id}/first")
        return result

    def load_short_energy(
        self,
        device_id: str,
        timestamp_start: int,
        timestamp_end: int,
        unit: str = "kWh",
    ) -> tuple[list | None, RestError | None]:

        max_interval = 12 * 3600  # maximum request interval for SE is 12 hours
        intervals = [
            TimeInterval(batch_start, min(batch_start + max_interval, timestamp_end))
            for batch_start in range(timestamp_start, timestamp_end, max_interval)
        ]
        return self._load_energy(
            f"short-energy/{device_id}", device_id, intervals, unit
        )


In [9]:
logger: logging.Logger = get_logger()
public_api_client = PublicApiClient(ENVIRONMENT, API_KEY, MAX_TPS, logger)

# Data downloading

In [None]:
def is_valid_device_id(device_id: str, raise_error: bool = False) -> tuple[bool, str]:
  """
  Returns True if the device ID is valid, False if not.
  Simplified version based on lib_common
  """
  DEVICE_ID_PATTERN = "^[B-F]{1}[A-F0-9]{12}$"
  DEVICE_ID_REGEX = re.compile(DEVICE_ID_PATTERN)
  # re.match won't detect trailing space in the device id, but re.fullmatch will.
  if not DEVICE_ID_REGEX.fullmatch(device_id):
      return False
  return True

# Determine devices to analyse
devices: list[str] = []
if DEVICE_IDS is not None and len(DEVICE_IDS) > 0:
  devices = DEVICE_IDS
else:
  # get all devices associated with API key
  result, error = public_api_client.get_devices_list()
  if error is not None:
    logger.error(f'failed to load devices for API key: {error}')
  else:
    devices = result

# filter out any invalid device ids
devices =[d for d in devices if is_valid_device_id(d)]


num_devices = len(devices)
logger.info(f'found {num_devices} devices to analyse')

In [25]:
# Determine timestamps for requests
time_start = pendulum.parse(START_DATE, tz=TIMEZONE)
timestamp_start = time_start.int_timestamp

time_end = pendulum.parse(END_DATE, tz=TIMEZONE)
timestamp_end = time_end.int_timestamp

if time_start > time_end:
    raise ValueError(f"Start date ({START_DATE}) has to be earlier than end date ({END_DATE})")
if time_end > pendulum.now():
    raise ValueError(f"End date ({END_DATE}) needs to be in the past")

ValueError: End date 2025-03-25 needs to be in the past

In [12]:
# TODO: load device status for each device to exclude decommissioned devices
# Issue: only user-apps-api exposes this and we can't use that because of different auth system

In [None]:
# Download LE data
le_data = {}
for index, device_id in enumerate(devices):
  logger.info(f'Downloading LE data for device {index+1}/{num_devices} - {device_id}')
  result, error = public_api_client.load_long_energy(device_id, timestamp_start, timestamp_end)
  if error is not None:
    logger.error(f'failed to load LE for device: {device_id}: {error}')
  else:
    le_data[device_id] = result

logger.info(f'Successfully downloaded LE data for {len(le_data)}/{num_devices} devices')

In [None]:
# Check whether le_data dict is empty
if not le_data:
    raise ValueError("No LE data was downloaded for any device.")

# Analysis


In [26]:
# Devices we couldn't download LE data for (included in devices list but not in LE dict)
devices_with_le_data = list(le_data.keys())
devices_without_le_data = [device_id for device_id in devices if device_id not in devices_with_le_data]

devices_with_empty_le_data = [device_id for device_id, data in le_data.items() if len(data) == 0]

# Determine expected number of intervals
# Calculate the number of 5-minute intervals between the start and end timestamps using pendulum
num_intervals_expected = int((time_end.diff(time_start).in_minutes()) // 5)

# Devices with missing LE data
# TODO: add alternative analysis based on timestamp and duration of interval (only works for intervals between existing intervals, i.e. need to handle missing intervals at start or end of period separately)
# Could also just do a quick analysis to verify all intervals have a duration of 300s.
devices_with_missing_le_data = {device_id: data for device_id, data in le_data.items() if len(data) < num_intervals_expected}

# Devices not meeting data completeness threshold
num_intervals_completeness_threshold = DATA_COMPLETENESS_THRESHOLD * num_intervals_expected // 100 # TODO: double check if this can result in off-by-one error
devices_not_meeting_threshold = {device_id: data for device_id, data in le_data.items() if len(data) < num_intervals_completeness_threshold}

# Devices with complete LE data
devices_with_complete_le_data = {device_id: data for device_id, data in le_data.items() if len(data) == num_intervals_expected}


In [27]:
# High level analysis

le_data_df = pd.DataFrame({
    'device_id': list(le_data.keys()),
    'le_intervals': list(le_data.values()),
    'num_intervals': [len(data) for data in le_data.values()]
})

le_data_df['num_intervals_expected'] = num_intervals_expected
le_data_df['num_intervals_missing'] = le_data_df['num_intervals_expected'] - le_data_df['num_intervals']
le_data_df['interval_completeness'] = le_data_df['num_intervals'] / le_data_df['num_intervals_expected']
le_data_df['interval_missingness'] = 1 - le_data_df['interval_completeness']

le_data_df = le_data_df.sort_values(by='interval_completeness')

df_devices_table = le_data_df[['device_id', 'interval_completeness', 'num_intervals_missing']].copy()
df_devices_table['interval_completeness'] = df_devices_table['interval_completeness'] * 100

parameters = [{
  'start_time': time_start,
  'end_time': time_end,
  'num_expected_intervals': num_intervals_expected,
  
}]

top_level_stats = [{
  'num_devices': num_devices,
  'overall_completeness': le_data_df['num_intervals'].sum() / (num_devices * num_intervals_expected) * 100,
  'devices_under_threshold': len(devices_not_meeting_threshold),
  'devices_with_missing_intervals': len(devices_with_missing_le_data),
  'devices_without_data': len(devices_with_empty_le_data),
  'devices_with_failed_retrieval': len(devices_without_le_data),
  'devices_with_complete_data': len(devices_with_complete_le_data),
}]

df_parameters = pd.DataFrame.from_dict(parameters)
df_stats = pd.DataFrame.from_dict(top_level_stats)

In [28]:
# Transform interval data

def flatten_arrays(item: dict) -> dict:
    """ flatten each element of arrays to their own key. Other types of values are left untouched.
    e.g. {key: [value0, value1, ...]} becomes {key_0: value0, key_1: value1, ...}.            
    """
    flattened = {}
    for key, value in item.items():
      if isinstance(value, list):
        for idx, subvalue in enumerate(value):
          flattened[f"{key}_{idx}"] = subvalue
      else:
        flattened[key] = value
    return flattened


# Create an empty DataFrame
intervals = []

for index, row in le_data_df.iterrows():
    # Perform any necessary operations on each row
    # For example, you could print the device_id and interval_completeness
    device_id = row['device_id']
    data = row['le_intervals']
    for item in data:
        row = flatten_arrays(item)
        row["device_id"] = device_id
        intervals.append(row)

df_intervals = pd.DataFrame.from_dict(intervals) 
# Reorder the columns to move 'device_id', 'timestamp', and 'duration' to the front
columns_order = ['device_id', 'timestamp', 'duration'] + [col for col in df_intervals.columns if col not in ['device_id', 'timestamp', 'duration']]
df_intervals = df_intervals[columns_order]



In [29]:
# By-day analysis

df_intervals['datetime'] = pd.to_datetime(df_intervals['timestamp'], unit='s').dt.tz_localize('UTC').dt.tz_convert(TIMEZONE)

df_daily_counts = df_intervals.groupby(['device_id', df_intervals['datetime'].dt.date]).size().reset_index(name='entry_count')
df_daily_counts.columns = ['device_id', 'date', 'num_intervals']
df_daily_counts.head(70)
df_daily_counts['date'] = pd.to_datetime(df_daily_counts['date']).dt.tz_localize(TIMEZONE)

# Add in missing intervals (set to 0)
date_range = pd.date_range(start=time_start, end=time_end.subtract(seconds=1), freq='D')
all_device_dates = pd.MultiIndex.from_product([devices, date_range], names=['device_id', 'date'])

missing_entries = all_device_dates.difference(df_daily_counts.set_index(['device_id', 'date']).index)
missing_df = pd.DataFrame(list(missing_entries), columns=['device_id', 'date'])
missing_df['num_intervals'] = 0
df_daily_counts = pd.concat([df_daily_counts, missing_df], ignore_index=True)

num_intervals_expected_daily = 24 * 12
df_daily_counts['missing_intervals'] = num_intervals_expected_daily - df_daily_counts['num_intervals']
df_daily_counts['interval_completeness'] = 100 * df_daily_counts['num_intervals'] / num_intervals_expected_daily
# Move interval_completeness column to 3rd column

# df_daily_counts['date'] = pd.to_datetime(df_daily_counts['date']).dt.date
df_daily_counts['date'] = pd.to_datetime(df_daily_counts['date']).dt.date


cols = df_daily_counts.columns.tolist()
cols.insert(2, cols.pop(cols.index('interval_completeness')))
df_daily_counts = df_daily_counts[cols]



ValueError: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 5

# Outputs

## High level stats

In [None]:
show(df_parameters)
show(df_stats, 
     columnDefs= [
        { "targets": [1], "createdCell": JavascriptFunction(
                f"""
                    function (td, cellData, rowData, row, col) {{
                        if (cellData < {DATA_COMPLETENESS_THRESHOLD}) {{
                            $(td).css('color', 'red')
                        }}
                    }}
                """
        )},
        {
            "targets": [1],
            "render": JavascriptCode("$.fn.dataTable.render.number(',', '.', 2, '', '%')"),
        }
    ],)



## Per device stats

In [None]:
show(df_devices_table, 
     columnDefs= [
        { "targets": [1], "createdCell": JavascriptFunction(
                f"""
                    function (td, cellData, rowData, row, col) {{
                        if (cellData < {DATA_COMPLETENESS_THRESHOLD}) {{
                            $(td).css('color', 'red')
                        }}
                    }}
                """
        )},
        {
            "targets": [1],
            "render": JavascriptCode("$.fn.dataTable.render.number(',', '.', 2, '', '%')"),
        }
    ],
    showIndex=False,
    buttons=["copyHtml5", "csvHtml5", "excelHtml5"]
)


## Per device per day stats

In [None]:
show(df_daily_counts, 
     columnDefs= [
        { "targets": [2], "createdCell": JavascriptFunction(
                f"""
                    function (td, cellData, rowData, row, col) {{
                        if (cellData < {DATA_COMPLETENESS_THRESHOLD}) {{
                            $(td).css('color', 'red')
                        }}
                    }}
                """
        )},
        {
            "targets": [2],
            "render": JavascriptCode("$.fn.dataTable.render.number(',', '.', 2, '', '%')"),
        }
    ],
    showIndex=False,
    pageLength=20,
    buttons=["copyHtml5", "csvHtml5", "excelHtml5"]
)

## Interval data

In [None]:
# NOTE: For large datasets, running this cell may result in memory issues.

show(df_intervals, 
     showIndex=False,
     maxBytes=0,
     pageLength=20,
     buttons=["copyHtml5", "csvHtml5", "excelHtml5"]
     )
