-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathutils.py
107 lines (81 loc) · 3.24 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import sys
# fix for https://github.com/dpkp/kafka-python/issues/2412
if sys.version_info >= (3, 12, 0):
import six
sys.modules["kafka.vendor.six.moves"] = six.moves
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Union
import logging
import os
from pathlib import Path
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
"""
Can hardcode settings here, read from env file, or pass as env vars
https://docs.pydantic.dev/latest/concepts/pydantic_settings/#field-value-priority
"""
kafka_bootstrap_servers: Union[str, List[str]] = "localhost:9093"
kafka_retention_ms: int = 18000000 # 300 minutes
kafka_input_consumer_timeout_ms: int = 2500 # 2.5 seconds
kafka_output_consumer_timeout_ms: int = 1500 # 1.5 seconds
task_time_limit_sec: int = 60 * 60 * 6 # 6 hours
# https://docs.celeryq.dev/en/v5.4.0/userguide/configuration.html#worker-max-memory-per-child
celery_worker_max_memory_per_child_kb: int = 1024000 # 1GB
model_config = SettingsConfigDict(
# have to use an absolute path here so celery workers can find it
env_file=(Path(__file__).parent / ".env"),
)
def get_input_topic_name(job_id: str):
topic_name = f"adala-input-{job_id}"
return topic_name
def get_output_topic_name(job_id: str):
topic_name = f"adala-output-{job_id}"
return topic_name
def ensure_topic(topic_name: str):
settings = Settings()
bootstrap_servers = settings.kafka_bootstrap_servers
retention_ms = settings.kafka_retention_ms
admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id="topic_creator",
api_version=(2, 5, 0),
)
topic = NewTopic(
name=topic_name,
num_partitions=1,
replication_factor=1,
topic_configs={"retention.ms": str(retention_ms)},
)
try:
admin_client.create_topics(new_topics=[topic])
except TopicAlreadyExistsError:
# we shouldn't hit this case when KAFKA_CFG_AUTO_CREATE_TOPICS=false unless there is a legitimate name collision, so should raise here after testing
pass
def delete_topic(topic_name: str):
settings = Settings()
bootstrap_servers = settings.kafka_bootstrap_servers
admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id="topic_deleter",
api_version=(2, 5, 0),
)
try:
admin_client.delete_topics(topics=[topic_name])
except UnknownTopicOrPartitionError:
logger.error(f"Topic {topic_name} does not exist and cannot be deleted.")
def init_logger(name, level=LOG_LEVEL):
"""Set up a logger that respects the LOG_LEVEL env var
Args:
name (str): the name of the logger, typically the __name__ of the module
level (Union[str,int]): the logging level to use
(either a string like "INFO" or an int coming from the logging
module, like logging.INFO)
Returns:
logging.Logger
"""
logger = logging.getLogger(name)
logger.setLevel(level)
return logger