[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/wattwatchers/rest-api-notebooks/blob/main/wattwatchers_rest_api_v3_modbus.ipynb)

# Introduction
This is a Jupyter notebook to quickly get you started with (some of) the API endpoints of the Wattwatchers REST API v3 (Mercury).
This notebook is specifically focused on downloading modbus data. This notebook is developed to be used on Google Colab.

The official documentation for the API can be found [here](https://docs.wattwatchers.com.au/api/v3/index.html).

There are two ways to use this notebook:

1. If you just want to download modbus data for all devices associated with your API key (and not write any code yourself), just enter the details of your request in the Configuration section and run the whole book (`Runtime/Run all`)
2. As a starting point to learn about the API and make different requests. In that case you can adjust any code in this notebook to your needs and run it.

To run the code in this notebook, hover your cursor over the brackets (`[ ]`) in the top left corner of a code block. Then click the play button. 

To run all code in the notebook select `Runtime/Run all` from the menu at the top of the page. This will download modbus data for all devices associated with your API key and save it to file based on the constants defined in the Configuration code block. As part of this process you need to follow the steps when prompted to mount your Google Drive (so the output file can be saved to your Google Drive account). NOTE: Depending on how many devices are associated with the API key and the time period requests, downloading the modbus data can take a long time.

For details on how access your API key, see the ["Authentication & Authroization" section of our API docs](https://docs.wattwatchers.com.au/api/v3/auth.html). REMEMBER: Your API key is just like a password—make sure you transfer and store it securely at all times (not via email etc.).

Go [here](https://colab.research.google.com/github/coolernato/Introduction-to-Python/blob/master/Using%20Jupyter%20Notebooks.ipynb) to learn move about using Jupyter Notebooks on Google Colab.

## Configuration

Insert your API key in the code block below.
Adjust any of the other constants to your needs.

In [None]:
# Insert your API key below (between the quotation marks)
API_KEY = "" 

DEFAULT_TIMEZONE = "Australia/Sydney" # The timezone to use in cases where a device's timezone is not known
START_DATE = "2024-07-24T00:00:00"    # Date string in the format <YYYY-MM-DD>T<HH:MM:SS> in the target timezone
END_DATE = "2024-07-28T00:00:00"      # Date string in the format <YYYY-MM-DD>T<HH:MM:SS> in the target timezone

FILE_NAME = "exported-data"           # desired name for the file containing the exported data (or the start of the filename if data is exported to multiple files). Do not include the file extension.
INCLUDE_DATE_AND_TIME = True          # True or False. If set to True, date and time strings will be included in the exported data (in addition to Unix timestamps)
OUTPUT_FORMAT = "csv"                 # either json or csv
DOWNLOAD_FILE = True                  # True or False. If set to True the file containing the exported data will be downloaded to your local machine after it has been saved to Google Drive.
ONE_FILE_PER_DEVICE = False           # True or False. Applicable to CSV exports only. 
                                      # If True, saves data for each device to its own CSV file; the file name will include the device id. 
                                      # If False, saves all device data to a single CSV file and outputs a second file allowing individual device channels to be mapped to their categories

BASE_URL = "https://api-v3.wattwatchers.com.au" # Do not modify this.

# Mounting Google Drive
In order to save the dowloaded data in a file, you need to connect this notebook to your Google Drive account by running the code block below and following the prompts.

In [None]:
from google.colab import drive
drive.mount("/gdrive")

## Request batching and throttling

Because the API has limits on the amount of data it returns for a single request, and on the number of calls it accepts in a time period, we need some methods to split large requests into batches and to throttle the requests we make.

In [None]:
import time

def throttler(last_request_time: float, max_requests_per_sec: int = 2):
  """
  The API is rate limited and therefore API requests may need to be throttled.
  This method throttles API request based on when the last request was made and the number of requests per second we are allowed to make 
  (the actual frequency of requests can be lower than the maximum allowed if requests take longer to complete than the minimum interval 
  between requests)
  """
  if last_request_time is None:
    return
  time_since_last_request = time.time() - last_request_time
  wait_duration = max(0, 1/max_requests_per_sec - time_since_last_request)
  if wait_duration > 0:
    time.sleep(wait_duration)

def calculate_batches_for(end_timestamp: int, time_interval: int) -> list:
  """"
  Because we can only request 7 days of data in a API call, we need to break up our time interval into batches. 
  This method returns the time intervals for each API call to cover the full desired time interval.
  """
  batch_interval = 7 * 24 * 60 * 60
  start_timestamp = end_timestamp - time_interval
  intervals = [(batch_start, min(batch_start + batch_interval, end_timestamp)) for batch_start in range(start_timestamp, end_timestamp, batch_interval)]
  return intervals

# Convenience methods to convert dates to timestamps

The API methods above use timestamps and time intervals. 
Below is a convenience method to convert two date and time strings to Unix UTC timestamp and interval to pass into the API methods.


In [None]:
import datetime as dt
import pytz 

def convert_dates_to_timestamp_and_interval(
    start_date_str: str, 
    end_date_str: str, 
    timezone_str: str = DEFAULT_TIMEZONE, 
    date_format: str = "%Y-%m-%dT%H:%M:%S") -> tuple:
  """
  Convenience method to convert a start date and end date string
  into a end timestamp and interval that can be used as parameters for the API methods.
  Start and end date parameters are to be formatted as "2021-06-29 17:08:00" (or according to the date_format parameter passed in)
  Dates are set to the timezone string passed in (defaults to DEFAULT_TIMEZONE)
  The method returns a tuple containing the end timestamp and the time interval.
  """
  timezone = pytz.timezone(timezone_str)
  start_timestamp = convert_date_to_timestamp(start_date_str, timezone, date_format)
  end_timestamp = convert_date_to_timestamp(end_date_str, timezone, date_format)
  return (end_timestamp, end_timestamp - start_timestamp)

def convert_date_to_timestamp(date_str: str, timezone: pytz.timezone, date_format: str) -> int:
  """
  Helper method to convert a date string for a given timezone to a timestamp
  """
  date_time = timezone.localize(dt.datetime.strptime(date_str, date_format))
  return int(date_time.timestamp())

## API request methods

Generic methods to make API requests.
These methods are not specific to a particular endpoint

In [None]:
import requests
import json

# Basic GET request method
def get_request(path: str, query_params: dict = {}, token: str = API_KEY):
  endpoint = BASE_URL + path
  headers = {"Authorization": f"Bearer {token}"}
  return requests.get(endpoint, params=query_params, headers=headers)

def load(endpoint: str, end_timestamp: int, time_interval: int) -> list:
  """
  Method that turns a request for a long time period into a batch of API calls,
  makes the (throttled) API calls, and concatenates the data returned from the calls. 
  """
  batches_intervals = calculate_batches_for(end_timestamp, time_interval)
  all_data = []
  last_request_time = None
  target_requests_per_second = 2 # Conservative default, this will be updated with header values after the first request
  for from_ts, to_ts in batches_intervals:
    throttler(last_request_time, target_requests_per_second)
    last_request_time = time.time()
    query_params = {"fromTs": from_ts, "toTs": to_ts}
    result = get_request(endpoint, query_params)
    if result.status_code != 200:
      print("failed to load batch. status code: ", result.status_code, result.content)
      continue
    target_requests_per_second = int(result.headers["X-RateLimit-TpsLimit"])
    data = result.json()
    all_data += data
  return all_data

## Methods for specific API endpoints

In [None]:
from collections import namedtuple
from typing import Optional

def get_devices_list() -> Optional[list]:
  """
  Method to load all device ids associated with the API key.
  """
  result = get_request("/devices")
  if result.status_code != 200:
    print("get_devices_list failed with status code", result.status_code)
    return None
  return result.json()

def get_device_status(device_id: str) -> Optional[dict]:
  """
  Method to load metadata for a specific device.
  """
  result = get_request(f"/devices/{device_id}")
  if result.status_code != 200:
    print("get_device_status failed:", result.status_code)
    return None
  return result.json()

def get_first_modbus_timestamp(device_id: str) -> Optional[dict]:
  """
  Method to get the timestamp of the first available modbus data for a device
  Used to limit the requested time interval for a device if the requested interval
  starts before the first available data for the device
  """
  query_params = {"fields[energy]": "timestamp"}
  result = get_request(f"/modbus/{device_id}/first", query_params)
  if result.status_code != 200:
    print("get_first_timestamp failed:", result.status_code)
    return None
  return result.json()

def get_latest_modbus_timestamp(device_id: str) -> Optional[dict]:
  """
  Method to get the timestamp of the latest available modbus data for a device
  Used to limit the requested time interval for a device if the requested interval
  ends after the latest available data for the device
  """
  query_params = {"fields[energy]": "timestamp"}
  result = get_request(f"/modbus/{device_id}/latest", query_params)
  if result.status_code != 200:
    print("get_latest_timestamp failed:", result.status_code)
    return None
  return result.json()

def adjusted_time_period_for(device_id: str, end_timestamp: int, time_interval: int) -> Optional[tuple[int, int]]:
  """
  Method to adjust the requested time period based on the timestamps of the first and latest available modbus data.
  Returns None if the requested time period does not intersect with available data.
  Returns adjusted end_timestamp and time_interval values if the requested time period 
  partly intersects with the the available data.
  """
  first_timestamp_result = get_first_modbus_timestamp(device_id)
  latest_timestamp_result = get_latest_modbus_timestamp(device_id)
  if first_timestamp_result is None or latest_timestamp_result is None:
    return None
  first_timestamp = first_timestamp_result["timestamp"]
  latest_timestamp = latest_timestamp_result["timestamp"]
  if first_timestamp > end_timestamp or latest_timestamp < end_timestamp - time_interval:
    return None
  if first_timestamp > end_timestamp - time_interval:
    time_interval = end_timestamp - first_timestamp
  if latest_timestamp < end_timestamp:
    time_interval = time_interval - (end_timestamp - latest_timestamp)
    end_timestamp = latest_timestamp
  return (end_timestamp, time_interval)

# Definition of a named tuple to encapsulate the parameters for a modbus request
ModbusParameters = namedtuple("ModbusParameters", ["end_timestamp", "time_interval", "timezone"])

def modbus_request_parameters(
    start_date_str: str = START_DATE, 
    end_date_str: str = END_DATE, 
    timezone: str = DEFAULT_TIMEZONE) -> ModbusParameters:
  """
  Method to map configuration values to the named tuple with parameters for a modbus request.
  """
  timestamp_end, time_interval = convert_dates_to_timestamp_and_interval(start_date_str, end_date_str)
  return ModbusParameters(timestamp_end, time_interval, timezone)

def load_modbus_data(
    device_id: str, 
    modbus_params: dict) -> list:
  """
  Method to load modbus data for a device for the specified period.
  """
  print("load modbus data for", device_id)
  # If required, adjust requested time period based on first and latest available modbus data
  adjusted_time_period = adjusted_time_period_for(device_id, modbus_params.end_timestamp, modbus_params.time_interval)
  if adjusted_time_period is None:
    print(f"no modbus data available for device {device_id} for the requested period")
    return None
  end_timestamp, time_interval = adjusted_time_period
  endpoint = f"/modbus/{device_id}"
  return load(endpoint, end_timestamp, time_interval)

def load_modbus_data_for_devices(devices: list, modbus_params: dict) -> list:
  """
  Method to load modbus data for the list of devices passed in.
  """
  results = []

  # Helper method to generate a dictionary with an id, data and (optionally) device metadata
  def device_modbus_dict_for(device_id: str, timezone: str, data: dict, metadata: dict):
    if INCLUDE_DATE_AND_TIME:
      tz = pytz.timezone(timezone)
      def _include_date_and_time(interval: dict) -> dict:
        datetime = dt.datetime.fromtimestamp(interval["timestamp"], tz)
        interval["date"] = datetime.strftime("%m/%d/%Y")
        interval["time"] = datetime.strftime("%H:%M:%S")
        return interval
      data = list(map(_include_date_and_time, data))

    device_modbus_dict = {
        "id": device_id,
        "data": data
    }
    if metadata is not None:
      device_modbus_dict["metadata"] = metadata
    return device_modbus_dict
    
  for device_id in devices:
    device_data = get_device_status(device_id)
    # Ignore any devices that do not support modbus
    if device_data.get("model") != "6M+One":
      print(f"device {device_data['id']} does not support modbus. (device model: {device_data.get('model', '')})")
      continue
    device_timezone = device_data.get("timezone") or modbus_params.timezone 
    device_le = load_modbus_data(device_id, modbus_params)
    if device_le is None:
      continue
    device_modbus_dict = device_modbus_dict_for(device_id, device_timezone, device_le, device_data)
    results.append(device_modbus_dict)
  return results
  
def load_modbus_data_for_all_devices(modbus_params: dict) -> list:
  """
  Method to load modbus data for all devices associated with the API key for the specified period.
  """
  devices = get_devices_list()
  if devices is None:
    return []
  if not devices: 
    print("no devices associated with the API key")
    return []
  return load_modbus_data_for_devices(devices, modbus_params)

# Load modbus data
The code block below loads modbus data for all devices associated with the API key that have modbus devices attached based on the configuration values defined at the start of the notebook.

In [None]:
params = modbus_request_parameters()
modbus_data = load_modbus_data_for_all_devices(params)

## Saving data to Google Drive
The block below allows you to save your data to a file on Google Drive - and download it to your local hard drive. Both JSON and CSV formats are supported.



In [None]:
from google.colab import files
import csv

def save_data_to_json_file(data: list, file_name: str, with_indent: bool = True) -> list:
  """
  Method to save data to JSON file on Google Drive
  The data includes metadata for every device.
  Passing in true for withIndent will indent the saved JSON for readability.
  This will significantly increase the file size, so only do so if you aim to read the raw JSON.
  """
  with open(f"/gdrive/My Drive/{file_name}.json", "w") as f:
    indent = 4 if with_indent else None
    json.dump(data, f, indent=indent)
  return [f"{file_name}.json"] 

def save_data_to_csv_files(data: list, file_name: str) -> list:
  """
  Method to save data to CSV files on Google Drive
  Depending on configurations this saves:
  - 1 file with modbus data for all devices and 1 file with metadata for all devices (allowing mapping of channels to categories, etc.)
  - 1 data file per device, using channel category labels in the headers.
  """
  if ONE_FILE_PER_DEVICE:
    return save_modbus_data_to_csv_file_per_device(data, file_name)
  else:
    save_modbus_data_to_csv_file(data, file_name)
    save_device_data_to_csv_file(data, f"{file_name}-devices")
    return [f"{file_name}.csv", f"{file_name}-devices.csv"]

def reorder_header_list(headers: list) -> list:
  """
  Helper method to reorder the list of CSV header values.
  """
  def _move_to_front(key: str):
        if key not in headers:
            return
        headers.remove(key)
        headers.insert(0, key)

  _move_to_front("duration")
  if INCLUDE_DATE_AND_TIME:
    _move_to_front("time")
    _move_to_front("date")
  _move_to_front("timestamp")
  _move_to_front("model")
  return headers

def save_modbus_data_to_csv_file_per_device(data: list, file_name: str):
  """"
  Method to save modbus data for each device to a separate CSV file in Google Drive
  Column headers will include the channel"s category label.
  """
  file_names = []
  for device in data:
    device_id = device["id"]
    # Ignore device if there"s no associated modbus data
    if len(device["data"]) == 0:
      print(f"Ignoring device {device_id}: no associated modbus data")
      continue

    headers = list(device["data"][0].keys())
    headers = reorder_header_list(headers)
    # add device id to headers
    headers.insert(0, "device_id")

    file_name_for_device = f"{file_name}-{device_id}.csv"
    with open(f"/gdrive/My Drive/{file_name_for_device}", "w", newline="") as f:
      writer = csv.DictWriter(f, headers)
      writer.writeheader()
      for item in device["data"]:
        item["device_id"] = device_id
        writer.writerow(item)
    file_names.append(file_name_for_device)

  return file_names   

def save_modbus_data_to_csv_file(data: list, file_name: str):
  """
  Method to save modbus data for all devices to a CSV file on Google Drive
  """

  # generate headers
  def _generate_csv_headers(data: dict) -> dict:
    header_fields = []
    for device in data:
      if len(device["data"]) > 0:
        keys = device["data"][0].keys()
        for key in keys:
          if not key in header_fields:
            header_fields.append(key)
       
    if len(header_fields) == 0:
      return None
        
    headers = reorder_header_list(header_fields)
    headers.insert(0, "device_id")
    return headers
  
  headers = _generate_csv_headers(data)
  if headers is None:
    print("None of the devices have associated modbus data. CSV export aborted.")
    return

  with open(f"/gdrive/My Drive/{file_name}.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, headers)
    writer.writeheader()
    
    for device in data:
      device_id = device["id"]
      for item in device["data"]:
        item["device_id"] = device_id
        writer.writerow(item)

def save_device_data_to_csv_file(data: list, file_name: str):
  """
  Method to save metadata for all devices to a CSV file on Google Drive
  The metadata can be used to map channel category labels for a device to its modbus data, amongst other things.
  """
  
  def _device_row_from_device_data(data: dict) -> dict:
    metadata = data["metadata"]
    if not "timezone"in metadata:
      metadata["timezone"] = ""
        
    row = {"device_id": metadata["id"], "timezone": metadata["timezone"], "phases_count": metadata["phases"]["count"], "model": metadata["model"]}
    for idx, channel_data in enumerate(metadata["channels"]):
      for key, value in channel_data.items():
        row[f"channel_{idx}_{key}"] = value
    return row

  def _generate_csv_headers(data: Optional[list]) -> Optional[list]:
    if data is None:
      return None
    header_fields = None
    for device in data:
      header_fields_device = list(_device_row_from_device_data(device).keys())
      if header_fields is None or (len(header_fields_device) > len(header_fields)):
        header_fields = header_fields_device
    return header_fields

  headers = _generate_csv_headers(data)
  if headers is None:
    print("No devices found. CSV export aborted.")
    return
  
  with open(f"/gdrive/My Drive/{file_name}.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, headers)
    writer.writeheader()
    
    for device in data:
      row = _device_row_from_device_data(device)
      writer.writerow(row)

def save_data(
    data: list, 
    file_name: str = FILE_NAME, 
    output_format: str = OUTPUT_FORMAT, 
    download_after_save: bool = DOWNLOAD_FILE):
  """
  Method to save data to file on Google Drive (either in CSV or JSON format)
  With the option to download the file to your local hard drive.
  """
  saved_files = []
  if output_format == "csv":
    saved_files = save_data_to_csv_files(data, file_name)
  elif output_format == "json":
    saved_files = save_data_to_json_file(data, file_name)
  else:
    print(f"unsupported output format {output_format}. Only csv and json are supported.")
    return
  if download_after_save:
    for file in saved_files:
      files.download(f"/gdrive/My Drive/{file}")

In [None]:
# Save the downloaded data to file, 
# based on the values defined in the configuration code block at the start of the notebook.
save_data(modbus_data)