From ee2d5c31e0f8cb7b818978c4f3498f4670a18450 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 9 Sep 2025 11:56:22 +0100 Subject: [PATCH 1/2] fix: kafka connection config --- .../crowdgit/services/queue/queue_service.py | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py b/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py index 348419b206..d4a24210f2 100644 --- a/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py +++ b/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py @@ -1,12 +1,13 @@ from crowdgit.services.base.base_service import BaseService from aiokafka import AIOKafkaProducer import asyncio +import json +import ssl from typing import List, Dict from crowdgit.settings import ( CROWD_KAFKA_TOPIC, CROWD_KAFKA_BROKERS, - CROWD_KAFKA_PASSWORD, - CROWD_KAFKA_USER, + CROWD_KAFKA_EXTRA, ) from crowdgit.errors import QueueConnectionError, QueueMessageProduceError @@ -17,16 +18,33 @@ class QueueService(BaseService): def __init__(self): super().__init__() self.kafka_topic = CROWD_KAFKA_TOPIC + self.kafka_producer = AIOKafkaProducer(**self._build_kafka_config()) + self._connected = False + + def _build_kafka_config(self): + """Build Kafka configuration with SSL support (same as test script)""" config = { "bootstrap_servers": CROWD_KAFKA_BROKERS, "client_id": self._CLIENT_ID, - "sasl_mechanism": "PLAIN", - "sasl_plain_username": CROWD_KAFKA_USER, - "sasl_plain_password": CROWD_KAFKA_PASSWORD, "acks": "all", } - self.kafka_producer = AIOKafkaProducer(**config) - self._connected = False + + # Parse extra configuration from kafkajs config + extra_config = json.loads(CROWD_KAFKA_EXTRA) + + if extra_config.get("ssl"): + config["security_protocol"] = "SASL_SSL" + ssl_context = ssl.create_default_context() + config["ssl_context"] = ssl_context + else: + config["security_protocol"] = "SASL_PLAINTEXT" + + sasl_config = extra_config["sasl"] + config["sasl_mechanism"] = sasl_config["mechanism"].upper() + config["sasl_plain_username"] = sasl_config["username"] + config["sasl_plain_password"] = sasl_config["password"] + + return config async def ensure_connected(self): """Ensure connection is established and healthy""" From 5f3d48b902ccfcffa78cc69fb11878329cbaa97d Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 9 Sep 2025 12:02:46 +0100 Subject: [PATCH 2/2] chore: remove useless comment --- .../git_integration/src/crowdgit/services/queue/queue_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py b/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py index d4a24210f2..5d237fff17 100644 --- a/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py +++ b/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py @@ -22,7 +22,6 @@ def __init__(self): self._connected = False def _build_kafka_config(self): - """Build Kafka configuration with SSL support (same as test script)""" config = { "bootstrap_servers": CROWD_KAFKA_BROKERS, "client_id": self._CLIENT_ID,