-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.py
52 lines (35 loc) · 1.17 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import uuid
from datetime import datetime
from typing import Dict, Any
from enum import Enum
from pydantic import Field, BaseModel
import redis
from pipeline import Settings, Pipeline
DEFINITION = "api:definition"
class Status(str, Enum):
ACCEPTED = "ACCEPTED"
PROCESSED = "PROCESSED"
class Result(BaseModel):
user: str
api: str
status: Status
submission_time: str = datetime.utcnow().isoformat()
result: Dict[str, Any] = dict()
class RedisSettings(Settings):
redis: str = Field("redis://localhost:6379/1", title="redis url")
class State:
def __init__(self, logger):
self.pipeline = Pipeline(logger=logger)
settings = RedisSettings()
settings.parse_args(args=[])
self.redis = redis.Redis.from_url(settings.redis)
def write(self, name, message):
# add topic to pipeline if it is not already done
if name not in self.pipeline.destinations:
self.pipeline.add_destination_topic(name)
# write message to pipeline
self.pipeline.destination_of(name).write(message)
def make_key():
return str(uuid.uuid1())
def make_topic(service_name: str):
return service_name