Open-source Python client library for the ACS Monitor REST API — the network and infrastructure monitoring platform from Anglia Computer Solutions.
A thin, dependency-light wrapper around requests. Compatible with
Python 3.9+, type-hinted, generator-based pagination, and friendly to
scripts, Ansible playbooks, data pipelines, and notebooks alike.
Licensing: this Python client is open source under the MIT licence. The ACS Monitor application it talks to is licensed commercial software with a free 100-device tier — see acsmon.com for tiers and pricing.
- Requirements
- Installation
- Quick start
- Configuration
- Authentication
- Method reference
- Returned data shapes
- Pagination & generators
- Filtering, sorting, searching
- Working with monitors
- Working with alerts
- Error handling
- Custom HTTP options
- Token storage best practice
- Recipes
- Troubleshooting
| Requirement | Minimum | Notes |
|---|---|---|
| Python | 3.9 | Uses from __future__ import annotations for typing |
requests |
2.28+ | Single runtime dependency |
| Network | Outbound to ACSMON_BASE_URL |
Whatever host runs your ACS Monitor install |
cd api-clients/python
pip install -r requirements.txtTo use it inside an existing project, copy the acsmon_client/
package into your source tree, or install it from a Git path:
pip install acsmon-client(Pinned by tag for repeatable builds: append @v1.0.26 etc.)
export ACSMON_BASE_URL=https://monitoring.example.com
export ACSMON_EMAIL=api@example.com
export ACSMON_PASSWORD='your-strong-password'
python examples/list_devices.py
python examples/poll_device.py 42Set ACSMON_TOKEN to a long-lived token to skip the login round trip.
from acsmon_client import AcsMonitorClient
api = AcsMonitorClient(
base_url="https://monitoring.example.com",
token=os.environ.get("ACSMON_TOKEN"),
timeout_seconds=30.0,
verify_tls=True,
)| Constructor arg | Type | Default | Purpose |
|---|---|---|---|
base_url |
str |
required | Root URL of your ACS Monitor install. The client appends /api/v1. Trailing slashes stripped. |
token |
Optional[str] |
None |
Bearer token. Skip if you intend to call login(). |
timeout_seconds |
float |
30.0 |
Per-request timeout passed to requests. |
verify_tls |
bool |
True |
Set to False only in lab environments with self-signed certs. |
The client owns a single requests.Session for connection pooling.
Multiple workers/threads should each have their own client instance.
Two ways to obtain a bearer token:
1. Trade email + password for one:
api = AcsMonitorClient("https://monitoring.example.com")
token = api.login("api@example.com", "secret")
# `api` will now send Authorization: Bearer <token> on every call.
# `token` is also returned so you can persist it externally.2. Reuse a token you've already obtained (preferred for daemons, cron jobs, Lambda):
api = AcsMonitorClient(
base_url=os.environ["ACSMON_BASE_URL"],
token=os.environ["ACSMON_TOKEN"],
)
# no login() call needed — the token is sent immediately.Revoke server-side:
api.logout() # POST /auth/logout, then clears the in-memory tokenFor long-running integrations, create a dedicated API user in the
ACS Monitor UI (Settings → Users) with the minimum role needed
(viewer for read-only, operator for ack/resolve, admin for full
write access). Tokens live until logout or user deletion.
Methods that return a list of rows are generators — for row in api.devices(): ... walks every page automatically. Single-resource
methods return a dict.
| Method | HTTP | Returns |
|---|---|---|
login(email, password) |
POST /auth/login |
str (token) |
logout() |
POST /auth/logout |
None |
| Method | HTTP | Returns |
|---|---|---|
devices(**query) |
GET /devices |
generator of dict |
device(device_id) |
GET /devices/{id} |
dict |
poll_device(device_id) |
POST /devices/{id}/poll |
dict |
device_metrics(device_id, **query) |
GET /devices/{id}/metrics |
dict |
device_alerts(device_id, **query) |
GET /devices/{id}/alerts |
dict |
| Method | HTTP | Returns |
|---|---|---|
monitors(**query) |
GET /monitors |
generator of dict |
monitor(monitor_id) |
GET /monitors/{id} |
dict |
create_monitor(payload) |
POST /monitors |
dict |
check_monitor(monitor_id) |
POST /monitors/{id}/check |
dict |
monitor_results(monitor_id, **query) |
GET /monitors/{id}/results |
dict |
monitor_uptime(monitor_id, **query) |
GET /monitors/{id}/uptime |
dict |
| Method | HTTP | Returns |
|---|---|---|
alert_events(**query) |
GET /alert-events |
generator of dict |
acknowledge_alert_event(event_id, note=None) |
POST /alert-events/{id}/acknowledge |
dict |
resolve_alert_event(event_id, note=None) |
POST /alert-events/{id}/resolve |
dict |
| Method | HTTP | Returns |
|---|---|---|
system_health() |
GET /system/health |
dict |
Every list response follows the same envelope:
The Python generator strips this envelope and yields individual rows
from data, then follows next_page_url until exhausted.
A typical Device dict:
{
"id": 42,
"name": "core-switch-01",
"ip_address": "10.0.0.1",
"snmp_version": "2c",
"device_type": "cisco_ios",
"status": "up", # up | down | unknown
"last_polled_at": "2026-04-22T10:14:33Z",
"poll_interval_seconds": 60,
"is_active": True,
"device_group": {"id": 3, "name": "Core network", "color": "#3b82f6"},
}A typical Monitor dict:
{
"id": 117,
"name": "Public website",
"type": "https",
"host": "example.com",
"port": 443,
"status": "up", # up | down | degraded | unknown
"response_time_ms": 142,
"ssl_expiry_at": "2026-09-14T00:00:00Z",
"uptime_percent_7d": 99.97,
"uptime_percent_30d": 99.81,
"consecutive_failures": 0,
"last_checked_at": "2026-04-22T10:15:01Z",
"config": {"method": "GET", "path": "/", "expected_status": 200, "ssl_verify": True},
}A typical AlertEvent dict:
{
"id": 9821,
"severity": "critical", # info | warning | critical
"status": "open", # open | acknowledged | resolved
"triggered_at": "2026-04-22T09:55:12Z",
"acknowledged_at": None,
"resolved_at": None,
"current_value": "down",
"monitor": {"id": 117, "name": "Public website", "type": "https"},
"device": None,
}The two-line loop:
for device in api.devices():
print(device["id"], device["name"])…is equivalent to:
import requests
page = 1
while True:
res = requests.get(f"{base_url}/api/v1/devices?page={page}", ...).json()
for d in res["data"]:
print(d["id"], d["name"])
if not res.get("next_page_url"):
break
page += 1You can pass any query string to the generator — page, per_page,
search, sort, and filter[...] are all forwarded. Because Python
identifiers can't contain [, pass bracketed filters via **:
for device in api.devices(**{"filter[status]": "down", "per_page": 100}):
print(device["name"], "is DOWN")
for monitor in api.monitors(sort="-response_time_ms", per_page=100):
if monitor["response_time_ms"] > 1000:
print(f"{monitor['name']}: {monitor['response_time_ms']}ms")To collect into a list instead of streaming:
down_devices = list(api.devices(**{"filter[status]": "down"}))To process in batches (memory-friendly with millions of events):
from itertools import islice
it = api.alert_events(**{"filter[status]": "open"})
while batch := list(islice(it, 500)):
bulk_insert(batch)| Query param | Example | Effect |
|---|---|---|
search |
search="core" |
Free-text search across name/host fields |
filter[<field>] |
**{"filter[status]": "down"} |
Exact-match filter (chainable) |
sort |
sort="-last_polled_at" |
Sort column. Prefix - for descending. |
per_page |
per_page=100 |
Page size (cap depends on endpoint) |
page |
page=3 |
Start page (use the generator unless paging manually) |
Combine freely:
for ev in api.alert_events(**{
"filter[severity]": "critical",
"filter[status]": "open",
"sort": "-triggered_at",
"per_page": 50,
}):
handle(ev)Create:
monitor = api.create_monitor({
"name": "Login API health",
"type": "https",
"host": "api.example.com",
"port": 443,
"check_interval_seconds": 60,
"timeout_ms": 5000,
"config": {
"method": "GET",
"path": "/health",
"expected_status": 200,
"expected_body_regex": '"ok":true',
"ssl_verify": True,
"alert_ssl_expiry_days": 14,
},
})
print(f"Created monitor {monitor['id']}")Other supported type values: ping, tcp, http, https, ssh,
smtp, ftp, pop3, imap, dns, mysql, redis, snmp. Each
type has its own config schema — see the main project README.
Force an immediate re-check:
api.check_monitor(monitor["id"]) # returns {"queued": True, "job_id": "..."}The check runs asynchronously on the probe service. Poll
api.monitor(id)["status"] or api.monitor_results(id) to see
the outcome.
Read the last N results:
recent = api.monitor_results(monitor["id"], per_page=20)
for r in recent["data"]:
print(r["checked_at"], r["status"], f"{r['response_time_ms']} ms")Uptime stats for SLA reporting:
uptime = api.monitor_uptime(monitor["id"], window="30d")
# {"uptime_percent": 99.91, "total_checks": 43200, "failed_checks": 39, ...}# Stream every open critical alert and ack it
for ev in api.alert_events(**{
"filter[status]": "open",
"filter[severity]": "critical",
}):
api.acknowledge_alert_event(ev["id"], note=f"Auto-ack from {socket.gethostname()}")
# Mark resolved when downstream resolves it
api.resolve_alert_event(ev["id"], note="Closed by INC-9421")note is optional on both ack and resolve — pass a short string and
it shows up in the audit log for that event.
Any non-2xx response raises AcsMonApiError with the HTTP status and
parsed body attached. Network errors (DNS, TLS, timeout) raise with
status = 0.
from acsmon_client import AcsMonApiError
try:
api.poll_device(99999)
except AcsMonApiError as exc:
print(f"HTTP {exc.status}: {exc.body}")
if exc.status == 404:
return # gone, skip
if exc.status == 403:
raise PermissionError("Token lacks devices.edit permission")
if exc.status == 0:
print("Network problem, will retry")
raise| Status | Meaning | Recovery |
|---|---|---|
401 |
Token missing/expired | Call login() again or refresh the secret |
403 |
Authenticated but lacks the permission slug for that route | Grant the role via the UI |
404 |
Resource doesn't exist (or wrong ID) | Check the ID, no retry |
422 |
Validation error — see exc.body["errors"] |
Fix the payload |
429 |
Rate-limited | Back off (the server returns Retry-After) |
500+ |
Server bug or upstream outage | Retry with exponential backoff |
0 |
Network/DNS/TLS/timeout | Retry with exponential backoff |
A simple retry helper:
import time
from acsmon_client import AcsMonApiError
def with_retry(fn, *, attempts=4, base_delay=0.5):
for i in range(attempts):
try:
return fn()
except AcsMonApiError as exc:
retriable = exc.status == 0 or exc.status == 429 or exc.status >= 500
if not retriable or i == attempts - 1:
raise
time.sleep(base_delay * (2 ** i))
with_retry(lambda: api.poll_device(42))The client owns a requests.Session exposed as api._session for
advanced needs:
# Add a custom CA bundle (corporate root CA)
api._session.verify = "/etc/ssl/certs/corp-ca.pem"
# Route via an HTTPS proxy
api._session.proxies = {
"http": "http://corp-proxy:3128",
"https": "http://corp-proxy:3128",
}
# Add a static custom header on every request (e.g. tracing)
api._session.headers["X-Request-Source"] = "weekly-report-job"
# Per-call timeout override — wrap the call in a context manager
import contextlib
@contextlib.contextmanager
def with_timeout(client, seconds):
old = client.timeout
client.timeout = seconds
try: yield
finally: client.timeout = old
with with_timeout(api, 120):
api.poll_device(42)For a self-signed cert in a lab (don't do this in production):
api = AcsMonitorClient("https://lab-monitoring.local", verify_tls=False)- Server-side daemons: read
ACSMON_TOKENfrom environment. Store in your secrets manager (HashiCorp Vault, AWS Secrets Manager, Doppler, Infisical, GitHub Actions secrets). - CLI tools: read from
~/.config/acsmon/tokenwith0600permissions, falling back toos.environ.get("ACSMON_TOKEN"). - Notebooks: load from
.envviapython-dotenvrather than hard-coding. - Never put the token into a public Git repo or any artifact that ships to a client device — it grants the same access as the user account it was issued to.
from acsmon_client import AcsMonitorClient
api = AcsMonitorClient(os.environ["ACSMON_BASE_URL"], os.environ["ACSMON_TOKEN"])
records = [
{
"external_id": f"acsmon-{d['id']}",
"hostname": d["name"],
"ip": d["ip_address"],
"status": d["status"],
"last_seen": d["last_polled_at"],
}
for d in api.devices()
]
my_cmdb.bulk_upsert(records)
print(f"Synced {len(records)} devices")import os, requests
from acsmon_client import AcsMonitorClient
api = AcsMonitorClient(os.environ["ACSMON_BASE_URL"], os.environ["ACSMON_TOKEN"])
for ev in api.alert_events(**{
"filter[status]": "open",
"filter[severity]": "critical",
}):
requests.post("https://events.pagerduty.com/v2/enqueue", json={
"routing_key": os.environ["PAGERDUTY_KEY"],
"event_action": "trigger",
"dedup_key": f"acsmon-{ev['id']}",
"payload": {
"summary": f"{(ev.get('monitor') or ev.get('device') or {}).get('name')}: {ev['current_value']}",
"severity": "critical",
"source": "ACS Monitor",
"custom_details": ev,
},
})
api.acknowledge_alert_event(ev["id"], note="Forwarded to PagerDuty")import os, sys, time
from acsmon_client import AcsMonitorClient, AcsMonApiError
api = AcsMonitorClient(os.environ["ACSMON_BASE_URL"], os.environ["ACSMON_TOKEN"])
monitor_id = int(os.environ["HEALTH_MONITOR_ID"])
api.check_monitor(monitor_id)
deadline = time.time() + 300 # 5 minutes
while time.time() < deadline:
m = api.monitor(monitor_id)
if m["status"] == "up":
print(f"✓ {m['name']} is up ({m['response_time_ms']}ms)")
sys.exit(0)
time.sleep(10)
print("Health monitor did not return to 'up' within 5 minutes", file=sys.stderr)
sys.exit(1)import csv, sys
from acsmon_client import AcsMonitorClient
api = AcsMonitorClient(os.environ["ACSMON_BASE_URL"], os.environ["ACSMON_TOKEN"])
writer = csv.writer(sys.stdout)
writer.writerow(["monitor", "type", "uptime_pct_7d", "failed_checks_7d"])
for m in api.monitors():
u = api.monitor_uptime(m["id"], window="7d")
writer.writerow([m["name"], m["type"], u["uptime_percent"], u["failed_checks"]])Drop into inventory/acsmon.py and mark it executable. Ansible will
parse the JSON it prints:
#!/usr/bin/env python3
import json, os
from acsmon_client import AcsMonitorClient
api = AcsMonitorClient(os.environ["ACSMON_BASE_URL"], os.environ["ACSMON_TOKEN"])
inventory = {"_meta": {"hostvars": {}}, "all": {"hosts": []}}
for d in api.devices(**{"filter[is_active]": "true"}):
name = d["name"]
inventory["all"]["hosts"].append(name)
inventory["_meta"]["hostvars"][name] = {
"ansible_host": d["ip_address"],
"device_type": d.get("device_type"),
"device_group": (d.get("device_group") or {}).get("name"),
}
print(json.dumps(inventory))AcsMonApiError ... → HTTP 401 — Token is missing, expired, or
belongs to a deleted user. Re-issue from Settings → Users.
AcsMonApiError ... → HTTP 403 — Authentication worked but the
user/role doesn't carry the permission slug for that endpoint. The
endpoint table in the project's main CLAUDE.md lists each route's
required permission slug.
requests.exceptions.SSLError: ... certificate verify failed — The
target uses a self-signed or untrusted cert. Add the CA to the system
trust store, or set verify_tls=False for lab environments only.
requests.exceptions.ConnectTimeout — Hit the per-request
timeout. Construct the client with a higher timeout_seconds, or
check whether the API server is overloaded.
requests.exceptions.ConnectionError: ... NameResolutionError —
DNS can't resolve ACSMON_BASE_URL. Verify with
curl -v "$ACSMON_BASE_URL/api/v1/system/health".
Generator yields nothing — The query returned no data. Check
filter values and confirm the user has *.view on that resource.
ImportError: No module named acsmon_client — Make sure
api-clients/python/ is on PYTHONPATH, or pip-install the package.
© Anglia Computer Solutions Ltd. — ACS Monitor is a product of Anglia Computer Solutions. Visit acsmon.com for documentation, demos and licensing.
{ "data": [ /* rows */ ], "links": { "first": "https://.../api/v1/devices?page=1", "last": "https://.../api/v1/devices?page=12", "prev": null, "next": "https://.../api/v1/devices?page=2" }, "meta": { "current_page": 1, "from": 1, "to": 50, "per_page": 50, "total": 593 }, "next_page_url": "https://.../api/v1/devices?page=2" }