-
Notifications
You must be signed in to change notification settings - Fork 186
/
Copy pathhttp_error_handling.py
126 lines (99 loc) · 4.32 KB
/
http_error_handling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""
Illustrates getting header values from Errors that may occur on write.
To test against cloud set the following environment variables:
INFLUX_URL
INFLUX_TOKEN
INFLUX_DATABASE
INFLUX_ORG
...otherwise will run against a standard OSS endpoint.
"""
import asyncio
import os
from typing import MutableMapping
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
def get_envar(key, default):
try:
return os.environ[key]
except:
return default
class Config(object):
def __init__(self):
self.url = get_envar("INFLUX_URL", "http://localhost:8086")
self.token = get_envar("INFLUX_TOKEN", "my-token")
self.bucket = get_envar("INFLUX_DATABASE", "my-bucket")
self.org = get_envar("INFLUX_ORG", "my-org")
def __str__(self):
return (f"config:\n"
f" url: {self.url}\n"
f" token: ****redacted*****\n"
f" bucket: {self.bucket}\n"
f" org: {self.org}\n"
)
# To encapsulate functions used in batch writing
class BatchCB(object):
def success(self, conf: (str, str, str), data: str):
print(f"Write success: {conf}, data: {data}")
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"\nBatch -> Write failed: {conf}, data: {data}, error: {exception.message}")
report_headers(exception.headers)
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}")
# simple reporter that server is available
def report_ping(ping: bool):
if not ping:
raise ValueError("InfluxDB: Failed to ping server")
else:
print("InfluxDB: ready")
# report some useful expected header fields
def report_headers(headers: MutableMapping[str, str]):
print(" Date: ", headers.get("Date"))
print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build"))
print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) # OSS version, Cloud should be None
print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) # OSS invalid, Cloud should be None
print(" Retry-After: ", headers.get("Retry-After")) # Should be None
print(" Trace-Id: ", headers.get("Trace-Id")) # OSS should be None, Cloud should return value
# try a write using a synchronous call
def use_sync(conf: Config):
print("Using sync")
with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client:
report_ping(client.ping())
try:
client.write_api(write_options=SYNCHRONOUS).write(bucket=conf.bucket, record="cpu,location=G4 usage=")
except ApiException as ae:
print("\nSync -> Caught ApiException: ", ae.message)
report_headers(ae.headers)
print("Sync write done")
# try a write using batch API
def use_batch(conf: Config):
print("Using batch")
with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client:
cb = BatchCB()
with client.write_api(success_callback=cb.success,
error_callback=cb.error,
retry_callback=cb.retry) as write_api:
write_api.write(bucket=conf.bucket, record="cpu,location=G9 usage=")
print("Batch write sent")
print("Batch write done")
# try a write using async.io
async def use_async(conf: Config):
print("Using async")
async with InfluxDBClientAsync(url=conf.url, token=conf.token, org=conf.org) as client:
report_ping(await client.ping())
try:
await client.write_api().write(bucket=conf.bucket, record="cpu,location=G7 usage=")
except InfluxDBError as ie:
print("\nAsync -> Caught InfluxDBError: ", ie.message)
report_headers(ie.headers)
print("Async write done")
if __name__ == "__main__":
conf = Config()
print(conf)
use_sync(conf)
print("\n Continuing...\n")
use_batch(conf)
print("\n Continuing...\n")
asyncio.run(use_async(conf))