# PowerP API Client (Python)

Reference notebook to list signals and query values from the PowerP Real-Time API using the secure Client Credentials flow.

## Prerequisites
- Set environment variables: `POWERP_CLIENT_ID`, `POWERP_CLIENT_SECRET` and optionally `POWERP_API_BASE_URL` (default: `https://tenant.powerp.app/rt-api/api`).
- Install dependencies: `pip install -r samples/python/requirements.txt`.
- Keep block sizes under 20 signals; 5-10 is recommended to balance throughput and latency.

In [None]:
import os
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, Iterable, List
import requests
import pandas as pd

In [None]:
# Basic configuration
CLIENT_ID = os.environ.get("POWERP_CLIENT_ID")
CLIENT_SECRET = os.environ.get("POWERP_CLIENT_SECRET")
BASE_URL = os.environ.get("POWERP_API_BASE_URL", "https://tenant.powerp.app/rt-api/api").rstrip("/")
MAX_BLOCK_SIZE = 20  # API limit
DEFAULT_BLOCK_SIZE = min(10, MAX_BLOCK_SIZE)
LOOKBACK_MINUTES = 15  # keep raw windows under 30 minutes
WINDOW_PERIOD = "200ms"

if not CLIENT_ID or not CLIENT_SECRET:
    raise RuntimeError("Set POWERP_CLIENT_ID and POWERP_CLIENT_SECRET before running the notebook")

def get_access_token():
    try:
        print("Authenticating...")
        res = requests.post(f"{BASE_URL}/v1/auth/token", data={
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "grant_type": "client_credentials"
        }, timeout=10)
        res.raise_for_status()
        token = res.json()["access_token"]
        print(f"Authenticated! Token length: {len(token)}")
        return token
    except Exception as e:
        raise RuntimeError(f"Authentication failed: {e}")

TOKEN = get_access_token()

HEADERS = {
    "Authorization": f"Bearer {TOKEN}",
    "Accept": "application/json",
}

In [None]:
def chunked(items: List[Any], size: int) -> Iterable[List[Any]]:
    size = max(1, min(size, MAX_BLOCK_SIZE))
    for idx in range(0, len(items), size):
        yield items[idx : idx + size]


def list_measurements() -> List[dict]:
    response = requests.get(f"{BASE_URL}/v1/measurements", headers=HEADERS, timeout=30)
    response.raise_for_status()
    return response.json()


def query_values(database_id: int, indexes: List[str], start_time: datetime, end_time: datetime, agg_function: str, window_period: str = WINDOW_PERIOD) -> List[dict]:
    payload = {
        "databaseId": database_id,
        "measurementIndexes": indexes,
        "startTime": start_time.isoformat() + "Z",
        "endTime": end_time.isoformat() + "Z",
        "aggFunction": agg_function,
        "windowPeriod": window_period,
    }
    response = requests.post(f"{BASE_URL}/v1/Query", headers=HEADERS, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()


print("Configuration ready; fetching metadata...")

In [None]:
# List signals and query in safe blocks
measurements = list_measurements()
print(f"Total measurements: {len(measurements)}")

grouped = defaultdict(list)
for row in measurements:
    key = (row.get("databaseId"), row.get("defaultAgg"))
    grouped[key].append(row)

end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=LOOKBACK_MINUTES)
block_size = DEFAULT_BLOCK_SIZE

for (database_id, agg_function), rows in grouped.items():
    agg_to_use = agg_function or "last"
    print(f"\nDatabase {database_id} | Aggregation {agg_to_use} | Signals {len(rows)}")
    for batch in chunked(rows, block_size):
        indexes = [str(item["index"]) for item in batch]
        data = query_values(database_id, indexes, start_time, end_time, agg_to_use)
        print(f"- Block size {len(indexes)} -> received {len(data)} data points")
        if data:
            sample = data[0]
            print(f"  sample: index={sample.get('index')} value={sample.get('value')} at {sample.get('timestamp')}")

## Good practices
- Keep `block_size` between 5 and 10; never exceed 20 (API limit).
- Keep raw lookback windows under 30 minutes; adjust `LOOKBACK_MINUTES` carefully.
- Avoid logging secrets or full payloads; sanitize console output when sharing.
- On 429/5xx responses, back off and retry with smaller blocks or longer delays.

## Metadata only
List signals without querying values; helpful for discovering indexes and defaults.

In [None]:
# List metadata without hitting the data endpoint
measurements = list_measurements()
print(f"Total measurements: {len(measurements)}")

if measurements:
    df = pd.DataFrame(measurements)[[
        "index", "name", "databaseId", "defaultAgg", "dataType", "unitSymbol", "firstDataPoint", "minValue", "maxValue"
    ]]
    print("Preview of metadata (first 20 rows):")
    print(df.head(20))
else:
    print("No measurements returned.")


## Query selected indexes
Provide specific measurement indexes manually, keeping windows small (<=30 minutes).

In [None]:
# Replace with the indexes you want to inspect (strings)
manual_indexes = ["1", "2"]  # e.g., ["101", "205"]

# Use metadata to infer database and default aggregation
if 'measurements' not in globals() or not measurements:
    measurements = list_measurements()

database_id = measurements[0].get("databaseId") if measurements else None
agg_function = (measurements[0].get("defaultAgg") if measurements else None) or "last"
print(f"Querying Database {database_id} with Agg {agg_function}")

end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=LOOKBACK_MINUTES)

if not database_id:
    raise ValueError("Define database_id manually or ensure measurements are loaded.")

data = query_values(database_id, manual_indexes, start_time, end_time, agg_function)
print(f"Requested indexes {manual_indexes} -> received {len(data)} data points")
if data:
    for item in data:
        print(f"idx={item.get('index')} value={item.get('value')} ts={item.get('timestamp')}")
