# EnergyID Webhook V2 Quick Demo

This notebook is a quick demonstration of the new EnergyID Webhook V2 API, which includes device provisioning, claiming, and the new data format for sending measurements to EnergyID.

There is also a full developer demo available in another notebook, which includes more detailed information about the API and how to use it.

# Imports and setup

In [None]:
from time import sleep
import json
import uuid
import datetime as dt

from energyid_webhooks.client_v2 import WebhookClient

To use a webhook, you will need a Client Secret and a Client Id. For this demo, we will load them from a file called `credentials.json`. You can create this file by copying the `credentials_sample.json` file and filling in the values.

In [None]:
from dotenv import load_dotenv
import os

load_dotenv()

credentials = {
    "PROVISIONING_KEY": os.getenv("PROVISIONING_KEY"),
    "PROVISIONING_SECRET": os.getenv("PROVISIONING_SECRET"),
}

Every device that sends data to EnergyID Needs a Unique Device ID. In this demo, we will use the `uuid` library to generate a random UUID for the device.

Note: Only create this device ID once, and then re-use it when you run the demo again. If you create a new device ID every time you run the demo, you will end up with multiple devices in EnergyID.

In [None]:
device_id = f"jupyter_demo_{uuid.uuid4().hex[:8]}"
print(f"Device ID: {device_id}")

# Webhook Client

The `WebhookClient` class is a simple
client that sends requests to the EnergyID Webhook API. It has methods for provisioning a device, claiming a device, and sending measurements.

To instantiate the client, you need to pass the `provisioning_key` and `provisioning_secret` to the constructor, as well as the `device_id` and a `device_name` for the device you want to provision. You can also pass other metadata such as IP address or MAC address.

In [None]:
client = WebhookClient(
    provisioning_key=credentials["PROVISIONING_KEY"],
    provisioning_secret=credentials["PROVISIONING_SECRET"],
    device_id="PASTE_DEVICE_ID_HERE",  # Replace with your device ID
    device_name="Jupyter Demo Device",
)

# Claiming a device

If you use a device for the first time, you need to claim it before you can send measurements. Call `client.authenticate()`, check `is_claimed`, and if it is `False` you can visit the `claim_url` to claim the device.

In [None]:
# Authenticate with EnergyID
is_claimed = await client.authenticate()

if is_claimed:
    print("✅ Device is already claimed and ready to send data!")
    print(f"Webhook URL: {client.webhook_url}")
    print(f"Auth valid until: {client.auth_valid_until}")
    print(f"\nWebhook policy: {json.dumps(client.webhook_policy, indent=2)}")
else:
    # Device needs to be claimed
    claim_info = client.get_claim_info()
    print("⚠️ Device needs to be claimed before sending data!")
    print(f"\nClaim Code: {claim_info['claim_code']}")
    print(f"Claim URL: {claim_info['claim_url']}")
    print(f"Valid until: {claim_info['valid_until']}")
    print("\n1. Visit the claim URL above in your browser")
    print("2. Log in to your EnergyID account if needed")
    print("3. Enter the claim code shown above")
    print("4. Once claimed, re-run this cell to continue or run the next cell")

In [None]:
# Authenticate with EnergyID
is_claimed = await client.authenticate()

if is_claimed:
    print("✅ Device is already claimed and ready to send data!")
    print(f"Webhook URL: {client.webhook_url}")
    # print(f"client info: {client.client_info}")
    print(f"Auth valid until: {client.auth_valid_until}")
    print(f"\nWebhook policy: {json.dumps(client.webhook_policy, indent=2)}")
else:
    raise ValueError("Device is not claimed yet, please claim it first")

# Sending Measurements

If you don't include a timestamp, we will use the current time.

In [None]:
data = {
    # ts is automatically added if not provided
    "el": 1255.7,  # Electricity consumption
    "el-i": 150.3,  # Electricity injection
    "pv": 3570.8,  # Solar production
    "gas": 450.2,  # Gas consumption
    "temperature": 21.5,  # Custom metric for temperature
}

# Send the data
await client.send_data(data)

# Sending Measurements with Timestamp

In [None]:
data = {
    "el": 1255.7,  # Electricity consumption
    "el-i": 150.3,  # Electricity injection
    "pv": 3570.8,  # Solar production
    "gas": 450.2,  # Gas consumption
    "temperature": 21.5,  # Custom metric for temperature
}

# Send the data
await client.send_data(
    data, timestamp=dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=1)
)

# Bundling Measurements

You can call `client.update_sensor` multiple times before calling `client.synchronize_sensors`. This way, only the most recent measurement for each sensor will be sent to EnergyID.

This method is useful if you have sensors that update frequently, but you only want to send data to EnergyID every few minutes.

In [None]:
await client.update_sensor(sensor_id="el", value=1256)
await client.update_sensor(sensor_id="el-i", value=150.4)
await client.update_sensor(sensor_id="pv", value=3571.2)

sleep(5)

await client.update_sensor(sensor_id="gas", value=450.3)
await client.update_sensor(sensor_id="temperature", value=21.6)

sleep(5)

# we overwrite the previous values
await client.update_sensor(sensor_id="el", value=1257)
await client.update_sensor(sensor_id="el-i", value=150.5)
await client.update_sensor(sensor_id="pv", value=3572.2)

await client.synchronize_sensors()

In [None]:
# Simple example of a loop that updates a sensor called "bat-soc" every second, but uploads the data every 10 seconds

# Set the total time you want the example to run
total_time = 600  # 10 minutes

start_time = dt.datetime.now(dt.timezone.utc)
end_time = start_time + dt.timedelta(seconds=total_time)

soc = 0
direction = "charge"
last_upload_time = None

while dt.datetime.now(dt.timezone.utc) < end_time:
    # Update the sensor value
    if direction == "charge":
        soc += 1
        if soc >= 100:
            direction = "discharge"
    else:
        soc -= 1
        if soc <= 0:
            direction = "charge"
    await client.update_sensor(sensor_id="bat-soc", value=soc)
    print(f"Updated sensor bat-soc to {soc}")

    # Upload the data every 60 seconds
    if last_upload_time is None or dt.datetime.now(
        dt.timezone.utc
    ) - last_upload_time > dt.timedelta(seconds=60):
        await client.synchronize_sensors()
        last_upload_time = dt.datetime.now(dt.timezone.utc)
        print("Uploaded data")

    sleep(10)