-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathcontainerhelper.py
105 lines (92 loc) · 4.11 KB
/
containerhelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import base64
import random
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Tuple
from testcontainers.core.container import DockerContainer
from testcontainers.core.network import Network
class ContainerHelper:
@staticmethod
def create_kafka_container(network: Network) -> Tuple[DockerContainer, str, str]:
"""
Returns (kafka container, internal broker address, external broker address) tuple
"""
docker_image_name = "confluentinc/cp-kafka:7.6.1"
docker_hostname = uuid.uuid4().hex
kafka_port = random.randint(16000, 20000)
internal_broker_address = f"{docker_hostname}:9092"
external_broker_address = f"127.0.0.1:{kafka_port}"
kraft_cluster_id = base64.urlsafe_b64encode(uuid.uuid4().bytes).decode()
kafka_container = (
DockerContainer(image=docker_image_name, hostname=docker_hostname)
.with_env("KAFKA_NODE_ID", "0")
.with_env("KAFKA_PROCESS_ROLES", "controller,broker")
.with_env(
"KAFKA_LISTENERS",
f"PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:{kafka_port}",
)
.with_env(
"KAFKA_ADVERTISED_LISTENERS",
f"PLAINTEXT://{internal_broker_address},EXTERNAL://{external_broker_address}",
)
.with_env(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT",
)
.with_env("KAFKA_CONTROLLER_QUORUM_VOTERS", f"0@{docker_hostname}:9093")
.with_env("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.with_env("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.with_env("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "10")
.with_env("CLUSTER_ID", kraft_cluster_id)
.with_env("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1")
.with_env("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.with_env("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.with_env("KAFKA_LOG_RETENTION_MS", "-1")
.with_bind_ports(kafka_port, kafka_port)
.with_network(network)
)
return kafka_container, internal_broker_address, external_broker_address
@staticmethod
def start_kafka_container(kafka_container: DockerContainer) -> None:
kafka_container.start()
wait_for_container_readiness(kafka_container, "Kafka Server started")
@staticmethod
def create_schema_registry_container(
network: Network,
broker_address: str,
) -> Tuple[DockerContainer, str]:
docker_image_name = "confluentinc/cp-schema-registry"
schema_registry_port = random.randint(16000, 20000)
schema_registry_address = f"http://0.0.0.0:{schema_registry_port}"
schema_registry_container = (
DockerContainer(image=docker_image_name)
.with_env(
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
f"PLAINTEXT://{broker_address}",
)
.with_env("SCHEMA_REGISTRY_LISTENERS", schema_registry_address)
.with_env("SCHEMA_REGISTRY_HOST_NAME", "localhost")
.with_bind_ports(schema_registry_port, schema_registry_port)
.with_network(network)
)
return schema_registry_container, schema_registry_address
@staticmethod
def start_schema_registry_container(
schema_registry_container: DockerContainer,
) -> None:
schema_registry_container.start()
wait_for_container_readiness(
schema_registry_container, "Server started, listening for requests"
)
def wait_for_container_readiness(container: DockerContainer, text: str) -> None:
start = datetime.now(timezone.utc)
cut_off = start + timedelta(seconds=20)
while cut_off > datetime.now(timezone.utc):
time.sleep(0.5)
logs = container.get_logs()
for line in logs:
line = line.decode()
if text in line:
return
raise TimeoutError("Failed to start container")