Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from crowdgit.services.base.base_service import BaseService
from aiokafka import AIOKafkaProducer
import asyncio
import json
import ssl
from typing import List, Dict
from crowdgit.settings import (
CROWD_KAFKA_TOPIC,
CROWD_KAFKA_BROKERS,
CROWD_KAFKA_PASSWORD,
CROWD_KAFKA_USER,
CROWD_KAFKA_EXTRA,
)
from crowdgit.errors import QueueConnectionError, QueueMessageProduceError

Expand All @@ -17,16 +18,32 @@ class QueueService(BaseService):
def __init__(self):
super().__init__()
self.kafka_topic = CROWD_KAFKA_TOPIC
self.kafka_producer = AIOKafkaProducer(**self._build_kafka_config())
self._connected = False

def _build_kafka_config(self):
config = {
"bootstrap_servers": CROWD_KAFKA_BROKERS,
"client_id": self._CLIENT_ID,
"sasl_mechanism": "PLAIN",
"sasl_plain_username": CROWD_KAFKA_USER,
"sasl_plain_password": CROWD_KAFKA_PASSWORD,
"acks": "all",
}
self.kafka_producer = AIOKafkaProducer(**config)
self._connected = False

# Parse extra configuration from kafkajs config
extra_config = json.loads(CROWD_KAFKA_EXTRA)

if extra_config.get("ssl"):
config["security_protocol"] = "SASL_SSL"
ssl_context = ssl.create_default_context()
config["ssl_context"] = ssl_context
else:
config["security_protocol"] = "SASL_PLAINTEXT"

sasl_config = extra_config["sasl"]
config["sasl_mechanism"] = sasl_config["mechanism"].upper()
config["sasl_plain_username"] = sasl_config["username"]
config["sasl_plain_password"] = sasl_config["password"]

return config

async def ensure_connected(self):
"""Ensure connection is established and healthy"""
Expand Down
Loading