-
Notifications
You must be signed in to change notification settings - Fork 1
/
result.py
78 lines (60 loc) · 2.34 KB
/
result.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import redis
from prometheus_client import Counter, Histogram
from dotenv import load_dotenv
from pipeline import ProcessorSettings, Processor, Command, CommandActions, Definition
from apihub.utils import Result, Status, RedisSettings, DEFINITION
from apihub import __worker__, __version__
load_dotenv()
class ResultWriter(Processor):
"""ResultWriter collects results from API service worker, and
store these results in Redis
"""
api_counter = Counter(
"api_requests_total",
"API requests",
["api", "user", "status"],
)
request_duration = Histogram(
"api_process_time_seconds",
"Processing time (seconds)",
labelnames=["api"],
)
def __init__(self) -> None:
settings = ProcessorSettings(
name=__worker__ + " ResultWriter",
version=__version__,
description="write results to redis",
debug=True,
monitoring=True,
)
super().__init__(settings, input_class=dict, output_class=None)
def setup(self) -> None:
settings = RedisSettings()
self.redis = redis.Redis.from_url(settings.redis)
def process_command(self, command: Command) -> None:
self.logger.info("Processing COMMAND")
if command.action == CommandActions.Define:
definition = Definition.parse_obj(command.content)
self.logger.info(definition)
self.redis.hset(DEFINITION, definition.source.topic, definition.json())
self.logger.info(
f"{definition.source.topic} definition:\n{definition.json()}"
)
def process(self, message_content, message_id):
self.logger.info("Processing MESSAGE")
result = Result.parse_obj(message_content)
if result.status == Status.PROCESSED:
result.result = {
k: message_content.get(k) for k in self.message.logs[-1].updated
}
self.api_counter.labels(api=result.api, user=result.user, status=result.status)
if self.redis.get(message_id) is not None:
self.logger.warning("Found result with key %s, overwriting...", message_id)
self.redis.set(message_id, result.json(), ex=86400)
return None
def main():
writer = ResultWriter()
writer.parse_args()
writer.start()
if __name__ == "__main__":
main()