From 77703f0573a00c7f53aa61b728ade10a2dfbfa79 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Wed, 25 Jun 2025 18:31:05 +0300 Subject: [PATCH 01/19] fix version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0b2cc2b..c5318ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "0.1.0" +version = "0.1.1" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0" From d36060122eff2314452676e29bab7006975c5cdc Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 11:20:03 +0300 Subject: [PATCH 02/19] fixes --- .gitignore | 3 ++- superclient/agent/__init__.py | 9 ++++++--- superclient/agent/interceptor.py | 19 +++++++++---------- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index d3834f4..1bd2fa5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__/ *$py.class superclient.egg-info build/ -dist/ \ No newline at end of file +dist/ +superstream_clients.egg-info/ \ No newline at end of file diff --git a/superclient/agent/__init__.py b/superclient/agent/__init__.py index c5aec99..0146c52 100644 --- a/superclient/agent/__init__.py +++ b/superclient/agent/__init__.py @@ -19,9 +19,6 @@ set_debug_enabled(True) logger = get_logger("agent") -logger.info("Superstream Agent initialized with environment variables: {}", _ENV_VARS) -if is_disabled(): - logger.warn("Superstream functionality disabled via SUPERSTREAM_DISABLED") # Preserve reference to built-in import function _original_import = builtins.__import__ @@ -93,6 +90,12 @@ def initialize(): 2. Schedules patching of any pre-imported modules 3. Starts the heartbeat thread """ + + # Log initialization message + logger.info("Superstream Agent initialized with environment variables: {}", _ENV_VARS) + if is_disabled(): + logger.warn("Superstream functionality disabled via SUPERSTREAM_DISABLED") + if is_disabled(): return diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index 62faf17..896130d 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -62,7 +62,7 @@ def init_patch(self, *args, **kwargs): error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) # Skip optimization but keep stats reporting opt_cfg = {} - elif not metadata.active: + elif not metadata.get("active", True): error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." logger.error(error_msg) # Skip optimization but keep stats reporting @@ -73,13 +73,12 @@ def init_patch(self, *args, **kwargs): # Apply optimized configuration for k, v in opt_cfg.items(): - snake = k.replace(".", "_") - if kwargs.get(snake) != v: - logger.debug("Overriding configuration: {} -> {}", snake, v) - kwargs[snake] = v + if kwargs.get(k) != v: + logger.debug("Overriding configuration: {} -> {}", k, v) + kwargs[k] = v # Set up reporting interval - report_interval = metadata.report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS # Create and register producer tracker tr = ProducerTracker( @@ -181,7 +180,7 @@ def init_patch(self, *args, **kwargs): logger.error(error_msg) # Skip optimization but keep stats reporting opt_cfg = {} - elif not metadata.active: + elif not metadata.get("active", True): error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." logger.error(error_msg) # Skip optimization but keep stats reporting @@ -193,7 +192,7 @@ def init_patch(self, *args, **kwargs): if kwargs.get(k) != v: logger.debug("Overriding configuration: {} -> {}", k, v) kwargs[k] = v - report_interval = metadata.report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS tr = ProducerTracker( lib="aiokafka", producer=self, @@ -280,7 +279,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs): logger.error(error_msg) # Skip optimization but keep stats reporting opt_cfg = {} - elif not metadata.active: + elif not metadata.get("active", True): error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." logger.error(error_msg) # Skip optimization but keep stats reporting @@ -292,7 +291,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs): if conf.get(k) != v: logger.debug("Overriding configuration: {} -> {}", k, v) conf[k] = v - report_interval = metadata.report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS tr = ProducerTracker( lib="confluent", producer=self, From 15023676ec0df6840dcc8ea717119a6fa3005659 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 11:48:12 +0300 Subject: [PATCH 03/19] fixes --- superclient/agent/interceptor.py | 24 ++++++++++++++++++------ superclient/agent/metadata.py | 17 +++++++++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index 896130d..d67feb8 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -73,8 +73,12 @@ def init_patch(self, *args, **kwargs): # Apply optimized configuration for k, v in opt_cfg.items(): - if kwargs.get(k) != v: - logger.debug("Overriding configuration: {} -> {}", k, v) + current_val = kwargs.get(k) + if current_val != v: + if k in kwargs: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) kwargs[k] = v # Set up reporting interval @@ -189,8 +193,12 @@ def init_patch(self, *args, **kwargs): # Get optimized configuration if Superstream is active opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka") for k, v in opt_cfg.items(): - if kwargs.get(k) != v: - logger.debug("Overriding configuration: {} -> {}", k, v) + current_val = kwargs.get(k) + if current_val != v: + if k in kwargs: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) kwargs[k] = v report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS tr = ProducerTracker( @@ -288,8 +296,12 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs): # Get optimized configuration if Superstream is active opt_cfg = optimal_cfg(metadata, topics_env, conf, "confluent") for k, v in opt_cfg.items(): - if conf.get(k) != v: - logger.debug("Overriding configuration: {} -> {}", k, v) + current_val = conf.get(k) + if current_val != v: + if k in conf: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) conf[k] = v report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS tr = ProducerTracker( diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index 6bc8b47..c1095d4 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -147,11 +147,20 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic cfg.setdefault(k, v) if latency: cfg.pop("linger.ms", None) - for p in ("batch.size", "linger.ms"): - if p in orig and p in cfg: + + # Translate Java-style keys to library-specific keys for comparison + java_keys_to_check = ["batch.size", "linger.ms"] + lib_keys_to_check = [] + for java_key in java_keys_to_check: + translated = translate_java_to_lib({java_key: ""}, lib_name) + lib_key = list(translated.keys())[0] if translated else java_key + lib_keys_to_check.append(lib_key) + + for java_key, lib_key in zip(java_keys_to_check, lib_keys_to_check): + if lib_key in orig and java_key in cfg: try: - if int(orig[p]) > int(cfg[p]): - cfg[p] = orig[p] + if int(orig[lib_key]) > int(cfg[java_key]): + cfg[java_key] = orig[lib_key] except Exception: pass From 5d56c1b29311bc26f4aa68425252a08565af0c88 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 17:55:49 +0300 Subject: [PATCH 04/19] fixes --- superclient/agent/interceptor.py | 18 ++++++++-------- superclient/agent/metadata.py | 25 ++++++++++++++++++----- superclient/agent/tracker.py | 3 --- superclient/core/reporter.py | 18 +++++++++++----- superclient/util/config.py | 35 ++++++++++++++++++++++++++------ 5 files changed, 72 insertions(+), 27 deletions(-) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index d67feb8..8df23c6 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -69,7 +69,9 @@ def init_patch(self, *args, **kwargs): opt_cfg = {} else: # Get optimized configuration if Superstream is active - opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python") + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python") + if warning_msg: + error_msg = warning_msg # Apply optimized configuration for k, v in opt_cfg.items(): @@ -99,9 +101,6 @@ def init_patch(self, *args, **kwargs): ) Heartbeat.register_tracker(tr) - # Initialize the producer with original configuration - orig_init(self, *args, **kwargs) - # Patch send and close methods if not already patched if not hasattr(self, "_superstream_patch"): original_send = self.send @@ -125,7 +124,7 @@ def close_patch(*a, **kw): self.close = close_patch self._superstream_patch = True - # Initialize again with optimized configuration + # Initialize with optimized configuration orig_init(self, *args, **kwargs) # Send client registration message @@ -191,7 +190,9 @@ def init_patch(self, *args, **kwargs): opt_cfg = {} else: # Get optimized configuration if Superstream is active - opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka") + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka") + if warning_msg: + error_msg = warning_msg for k, v in opt_cfg.items(): current_val = kwargs.get(k) if current_val != v: @@ -214,7 +215,6 @@ def init_patch(self, *args, **kwargs): topics_env=topics_env, ) Heartbeat.register_tracker(tr) - orig_init(self, *args, **kwargs) if not hasattr(self, "_superstream_patch"): original_send = self.send @@ -294,7 +294,9 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs): opt_cfg = {} else: # Get optimized configuration if Superstream is active - opt_cfg = optimal_cfg(metadata, topics_env, conf, "confluent") + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, conf, "confluent") + if warning_msg: + error_msg = warning_msg for k, v in opt_cfg.items(): current_val = conf.get(k) if current_val != v: diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index c1095d4..90c4b29 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -126,25 +126,40 @@ def fetch_metadata( logger.error("[ERR-203] Failed to fetch metadata: {}", exc) return None -def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> Dict[str, Any]: - """Compute optimal configuration based on metadata and topics.""" +def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]: + """Compute optimal configuration based on metadata and topics. + + Returns: + tuple: (configuration_dict, warning_message) + """ latency = os.getenv("SUPERSTREAM_LATENCY_SENSITIVE", "false").lower() == "true" cfg: Dict[str, Any] + warning_msg = "" + if not metadata or not metadata.get("topics_configuration"): logger.debug("No metadata or topics_configuration found; applying default configuration: %s", _DEFAULTS) - logger.warning("The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console") + if not topics: + warning_msg = "No SUPERSTREAM_TOPICS_LIST environment variable set. Please set it to enable topic-specific optimizations." + else: + warning_msg = "The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console" + logger.warning(warning_msg) cfg = dict(_DEFAULTS) else: matches = [tc for tc in metadata["topics_configuration"] if tc["topic_name"] in topics] if not matches: logger.debug("No matching topics found in metadata; applying default configuration: %s", _DEFAULTS) - logger.warning("The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console") + if not topics: + warning_msg = "No SUPERSTREAM_TOPICS_LIST environment variable set. Please set it to enable topic-specific optimizations." + else: + warning_msg = "The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console" + logger.warning(warning_msg) cfg = dict(_DEFAULTS) else: best = max(matches, key=lambda tc: tc["potential_reduction_percentage"] * tc["daily_writes_bytes"]) cfg = dict(best.get("optimized_configuration", {})) for k, v in _DEFAULTS.items(): cfg.setdefault(k, v) + if latency: cfg.pop("linger.ms", None) @@ -165,4 +180,4 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic pass # Translate Java-style keys to library-specific keys - return translate_java_to_lib(cfg, lib_name) \ No newline at end of file + return translate_java_to_lib(cfg, lib_name), warning_msg \ No newline at end of file diff --git a/superclient/agent/tracker.py b/superclient/agent/tracker.py index a94ab0f..498ee2e 100644 --- a/superclient/agent/tracker.py +++ b/superclient/agent/tracker.py @@ -52,7 +52,6 @@ def determine_topic(self) -> str: """Get the most impactful topic for this producer based on metadata analysis.""" if not self.metadata or not self.metadata.get("topics_configuration"): # Fallback to first topic if no metadata available - logger.warning("No metadata available for producer {}, falling back to first used topic", self.client_id) return sorted(self.topics)[0] if self.topics else "" # Find matching topic configurations from metadata based on environment topics only @@ -63,8 +62,6 @@ def determine_topic(self) -> str: if not matches: # Fallback to first environment topic if no matches - logger.warning("No matching topics found in metadata for producer {} (env topics: {}), falling back to first environment topic", - self.client_id, self.topics_env) return sorted(self.topics_env)[0] if self.topics_env else "" # Use the same logic as optimal_cfg: find the topic with highest impact diff --git a/superclient/core/reporter.py b/superclient/core/reporter.py index 1aab492..5632469 100644 --- a/superclient/core/reporter.py +++ b/superclient/core/reporter.py @@ -78,18 +78,26 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt return try: + # Handle aiokafka (async library) if lib_name == "aiokafka": asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload)) return - prod = builder(bootstrap, base_cfg) - if hasattr(prod, "produce"): - prod.produce("superstream.clients", payload) - prod.flush() - else: + # Handle kafka-python (sync library) + if lib_name == "kafka-python": + prod = builder(bootstrap, base_cfg) prod.send("superstream.clients", payload) prod.flush() prod.close() + return + + # Handle confluent-kafka (sync library with different API) + if lib_name == "confluent": + prod = builder(bootstrap, base_cfg) + prod.produce("superstream.clients", payload) + prod.flush() + return + except Exception: logger.debug("Failed to send clients message via {}", lib_name) diff --git a/superclient/util/config.py b/superclient/util/config.py index 723f325..fa5bcc5 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -30,7 +30,27 @@ def get_topics_list() -> List[str]: def mask_sensitive(k: str, v: Any) -> Any: """Mask sensitive configuration values.""" - return "[MASKED]" if "password" in k.lower() or "sasl.jaas.config" in k.lower() else v + sensitive_patterns = [ + "password", "sasl.jaas.config", "basic.auth.user.info", + "ssl.key", "ssl.cert", "ssl.truststore", "ssl.keystore", + "sasl.kerberos.keytab", "sasl.kerberos.principal" + ] + return "[MASKED]" if any(pattern in k.lower() for pattern in sensitive_patterns) else v + +def _serialize_config_value(v: Any) -> Any: + """Convert configuration values to JSON-serializable format.""" + if callable(v): + # Convert functions to string representation + return f"" + elif hasattr(v, '__dict__'): + # Handle objects that might have __dict__ but aren't functions + try: + # Try to serialize as dict, fallback to string representation + return v.__dict__ + except: + return f"" + else: + return v def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any]): """Copy essential client configuration properties from source to destination. @@ -137,7 +157,6 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "sasl.kerberos.domain.name": "sasl_kerberos_domain_name", "sasl.oauth.token.provider": "sasl_oauth_token_provider", "socks5.proxy": "socks5_proxy", - "compression.codec": "compression_type", # Maps to compression_type }, "aiokafka": { # Basic configuration @@ -159,7 +178,6 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "enable.idempotence": "enable_idempotence", "security.protocol": "security_protocol", "sasl.mechanism": "sasl_mechanism", - "compression.codec": "compression_type", # Maps to compression_type }, "confluent": { # Confluent uses Java-style names directly, so most mappings are 1:1 @@ -256,7 +274,7 @@ def translate_lib_to_java(cfg: Dict[str, Any], lib_name: str) -> Dict[str, Any]: "enable_idempotence": False, "delivery_timeout_ms": 120000, "acks": 1, - "compression_type": None, + "compression_type": "none", "retries": 0, "batch_size": 16384, "linger_ms": 0, @@ -303,7 +321,7 @@ def translate_lib_to_java(cfg: Dict[str, Any], lib_name: str) -> Dict[str, Any]: # Producer specific "acks": 1, - "compression_type": None, + "compression_type": "none", "max_batch_size": 16384, # aiokafka uses max_batch_size "linger_ms": 0, "partitioner": None, @@ -422,5 +440,10 @@ def get_original_config(orig_cfg: Dict[str, Any], lib_name: str) -> Dict[str, An for k, v in defaults_java.items(): if k not in user_keys_java: merged[k] = v + + # Serialize any function objects to make them JSON-serializable + serialized: Dict[str, Any] = {} + for k, v in merged.items(): + serialized[k] = _serialize_config_value(v) - return merged \ No newline at end of file + return serialized \ No newline at end of file From cf8452a968e28caab31bc88d8b3c9b6189bb2a7a Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 20:13:15 +0300 Subject: [PATCH 05/19] fixes --- examples/requirements.txt | 2 +- superclient/agent/metadata.py | 14 +++++++++----- superclient/core/manager.py | 6 +++--- superclient/core/reporter.py | 6 +++--- superclient/util/config.py | 27 +++++++++++++++++++++------ 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/examples/requirements.txt b/examples/requirements.txt index d583425..e612393 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -1,4 +1,4 @@ -kafka-python==2.0.2 +kafka-python==2.2.14 confluent-kafka==2.3.0 aiokafka==0.10.0 aws-msk-iam-sasl-signer-python==1.0.2 \ No newline at end of file diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index 90c4b29..d664691 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -24,7 +24,7 @@ def _create_consumer_kafka_python(bootstrap: str, base_cfg: Dict[str, Any]): "enable_auto_commit": False, "auto_offset_reset": "earliest", } - copy_client_configuration_properties(base_cfg, consumer_cfg) + copy_client_configuration_properties(base_cfg, consumer_cfg, "kafka-python") return kafka.KafkaConsumer(**consumer_cfg) def _create_consumer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): @@ -36,7 +36,7 @@ def _create_consumer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): "enable_auto_commit": False, "auto_offset_reset": "earliest", } - copy_client_configuration_properties(base_cfg, consumer_cfg) + copy_client_configuration_properties(base_cfg, consumer_cfg, "aiokafka") return AIOKafkaConsumer(**consumer_cfg) def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): @@ -48,7 +48,7 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "enable.auto.commit": False, "auto.offset.reset": "earliest", } - copy_client_configuration_properties(base_cfg, consumer_cfg) + copy_client_configuration_properties(base_cfg, consumer_cfg, "confluent") return Consumer(consumer_cfg) _CONSUMER_BUILDERS = { @@ -108,14 +108,18 @@ def fetch_metadata( tp = _kafka.TopicPartition(topic, 0) consumer.assign([tp]) - consumer.seek_to_end(tp) - end = consumer.position(tp) + + # Get the end offset safely + end_offsets = consumer.end_offsets([tp]) + end = end_offsets.get(tp, 0) + if end == 0: logger.error( "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." ) consumer.close() return None + consumer.seek(tp, end - 1) recs = consumer.poll(timeout_ms=5000) consumer.close() diff --git a/superclient/core/manager.py b/superclient/core/manager.py index 80c8b2c..d9f3cbf 100644 --- a/superclient/core/manager.py +++ b/superclient/core/manager.py @@ -60,7 +60,7 @@ def fetch_metadata_kafka_python(bootstrap: str, cfg: Dict[str, Any]) -> Optional "enable_auto_commit": False, "group_id": "superstream-metadata-fetcher", } - copy_client_configuration_properties(cfg, kafka_cfg) + copy_client_configuration_properties(cfg, kafka_cfg, "kafka-python") # Create consumer and fetch message consumer = KafkaConsumer(topic, **kafka_cfg) @@ -102,7 +102,7 @@ def fetch_metadata_confluent(bootstrap: str, cfg: Dict[str, Any]) -> Optional[Me "enable.auto.commit": False, "group.id": "superstream-metadata-fetcher", } - copy_client_configuration_properties(cfg, kafka_cfg) + copy_client_configuration_properties(cfg, kafka_cfg, "confluent") # Create consumer and fetch message consumer = Consumer(kafka_cfg) @@ -139,7 +139,7 @@ async def fetch_metadata_aiokafka(bootstrap: str, cfg: Dict[str, Any]) -> Option "enable_auto_commit": False, "group_id": "superstream-metadata-fetcher", } - copy_client_configuration_properties(cfg, kafka_cfg) + copy_client_configuration_properties(cfg, kafka_cfg, "aiokafka") # Create consumer and fetch message consumer = AIOKafkaConsumer(topic, **kafka_cfg) diff --git a/superclient/core/reporter.py b/superclient/core/reporter.py index 5632469..884b3b6 100644 --- a/superclient/core/reporter.py +++ b/superclient/core/reporter.py @@ -29,7 +29,7 @@ def _create_producer_kafka_python(bootstrap: str, base_cfg: Dict[str, Any]): "batch.size": 16_384, "linger.ms": 1000, } - copy_client_configuration_properties(base_cfg, cfg) + copy_client_configuration_properties(base_cfg, cfg, "kafka-python") kafka_cfg = {k.replace(".", "_"): v for k, v in cfg.items()} return kafka.KafkaProducer(**kafka_cfg) @@ -44,7 +44,7 @@ def _create_producer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "batch.size": 16384, "linger.ms": 1000, } - copy_client_configuration_properties(base_cfg, cfg) + copy_client_configuration_properties(base_cfg, cfg, "confluent") return _CProducer(cfg) @@ -58,7 +58,7 @@ async def _create_producer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): "batch_size": 16_384, "linger_ms": 1000, } - copy_client_configuration_properties(base_cfg, cfg) + copy_client_configuration_properties(base_cfg, cfg, "aiokafka") return AIOKafkaProducer(**cfg) diff --git a/superclient/util/config.py b/superclient/util/config.py index fa5bcc5..50fccc5 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -52,13 +52,18 @@ def _serialize_config_value(v: Any) -> Any: else: return v -def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any]): +def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any], lib_name: str = "confluent"): """Copy essential client configuration properties from source to destination. This ensures internal Kafka clients have the same security, network, and connection configurations as the user's Kafka clients. Only copies properties that are explicitly set in the source configuration. + + Args: + src: Source configuration (in library-specific syntax) + dst: Destination configuration (in library-specific syntax) + lib_name: Library name for key translation """ - # List of all possible auth/network related configs + # List of all possible auth/network related configs in Java-style syntax possible_keys = [ # Security protocol "security.protocol", @@ -108,10 +113,20 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "retries" ] - # Only copy properties that are explicitly set in the source configuration - for k in possible_keys: - if k in src and k not in dst: - dst[k] = src[k] + # For each Java-style key, find if it exists in the source config + # by checking both the Java-style key and its library-specific equivalent + for java_key in possible_keys: + # Check if the Java-style key exists in source + if java_key in src: + if java_key not in dst: + dst[java_key] = src[java_key] + continue + + # Check if the library-specific equivalent exists in source + if lib_name in _JAVA_TO_LIB_MAPPING: + lib_key = _JAVA_TO_LIB_MAPPING[lib_name].get(java_key, java_key) + if lib_key in src and lib_key not in dst: + dst[lib_key] = src[lib_key] # --------------------------------------------------------------------------- # Field name mapping between Java-style and library-specific representations From b05cd8ada97e1e0070549639a445b9e209e48623 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 21:09:59 +0300 Subject: [PATCH 06/19] fixes --- superclient/util/config.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/superclient/util/config.py b/superclient/util/config.py index 50fccc5..12dbe8a 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -80,6 +80,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "ssl.ca.location", "ssl.ca.pem", "ssl.ca.certificate.stores", "ssl.crl.location", "ssl.providers", "ssl.context", + "ssl.cafile", "ssl.certfile", "ssl.keyfile", # SASL properties "sasl.mechanism", "sasl.mechanisms", "sasl.jaas.config", @@ -92,6 +93,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "sasl.login.refresh.min.period.seconds", "sasl.login.refresh.buffer.seconds", "sasl.login.retry.backoff.ms", "sasl.login.retry.backoff.max.ms", "sasl.username", "sasl.password", + "sasl.plain.username", "sasl.plain.password", "sasl.oauthbearer.config", "sasl.oauthbearer.client.id", "sasl.oauthbearer.client.secret", "sasl.oauthbearer.scope", "sasl.oauthbearer.extensions", "sasl.oauthbearer.token.endpoint.url", @@ -167,11 +169,16 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "metrics.num.samples": "metrics_num_samples", "metrics.sample.window.ms": "metrics_sample_window_ms", "sasl.mechanism": "sasl_mechanism", + "sasl.plain.username": "sasl_plain_username", + "sasl.plain.password": "sasl_plain_password", "sasl.kerberos.name": "sasl_kerberos_name", "sasl.kerberos.service.name": "sasl_kerberos_service_name", "sasl.kerberos.domain.name": "sasl_kerberos_domain_name", "sasl.oauth.token.provider": "sasl_oauth_token_provider", "socks5.proxy": "socks5_proxy", + "ssl.cafile": "ssl_cafile", + "ssl.certfile": "ssl_certfile", + "ssl.keyfile": "ssl_keyfile", }, "aiokafka": { # Basic configuration From d8060037742ac6046c95956599b26576d43cf20f Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 21:29:07 +0300 Subject: [PATCH 07/19] fixes --- superclient/agent/interceptor.py | 14 ++++++++++++++ superclient/agent/metadata.py | 26 +++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index 8df23c6..b3b2c25 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -37,6 +37,10 @@ def init_patch(self, *args, **kwargs): # Store original configuration orig_cfg = dict(kwargs) + # Normalize compression type: convert None to "none" string + if "compression_type" in orig_cfg and orig_cfg["compression_type"] is None: + orig_cfg["compression_type"] = "none" + # Get bootstrap servers from args or kwargs bootstrap = orig_cfg.get("bootstrap_servers") or (args[0] if args else None) if not bootstrap: @@ -164,6 +168,11 @@ def init_patch(self, *args, **kwargs): if is_disabled(): return orig_init(self, *args, **kwargs) orig_cfg = dict(kwargs) + + # Normalize compression type: convert None to "none" string + if "compression_type" in orig_cfg and orig_cfg["compression_type"] is None: + orig_cfg["compression_type"] = "none" + bootstrap = orig_cfg.get("bootstrap_servers") if not bootstrap and args: bootstrap = args[0] @@ -270,6 +279,11 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs): if is_disabled(): return orig_init(self, conf, *args, **kwargs) conf = dict(conf) + + # Normalize compression type: convert None to "none" string + if "compression.type" in conf and conf["compression.type"] is None: + conf["compression.type"] = "none" + bootstrap = conf.get("bootstrap.servers") if not bootstrap: return orig_init(self, conf, *args, **kwargs) diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index d664691..5ff268e 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -165,7 +165,23 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic cfg.setdefault(k, v) if latency: - cfg.pop("linger.ms", None) + # For latency-sensitive applications, don't apply linger.ms optimization + # Keep the original value if it exists, otherwise use a default + if "linger.ms" in cfg: + # Get the original linger.ms value if it exists + orig_linger_key = None + if lib_name == "kafka-python" or lib_name == "aiokafka": + orig_linger_key = "linger_ms" + elif lib_name == "confluent": + orig_linger_key = "linger.ms" + + if orig_linger_key and orig_linger_key in orig: + # Use the original value instead of the optimized one + cfg["linger.ms"] = orig[orig_linger_key] + logger.debug("Using original linger.ms value ({}) for latency-sensitive application", orig[orig_linger_key]) + else: + # Remove the optimized value but it will be added back as default later + cfg.pop("linger.ms") # Translate Java-style keys to library-specific keys for comparison java_keys_to_check = ["batch.size", "linger.ms"] @@ -179,9 +195,17 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic if lib_key in orig and java_key in cfg: try: if int(orig[lib_key]) > int(cfg[java_key]): + logger.debug("Keeping original {} value ({}) as it's larger than optimized ({})", java_key, orig[lib_key], cfg[java_key]) cfg[java_key] = orig[lib_key] except Exception: pass + # Ensure all core optimization parameters are present in the final config + # But don't add linger.ms back if it was removed for latency sensitivity + for k, v in _DEFAULTS.items(): + if k not in cfg: + if not (latency and k == "linger.ms"): + cfg[k] = v + # Translate Java-style keys to library-specific keys return translate_java_to_lib(cfg, lib_name), warning_msg \ No newline at end of file From 209b36d9e54d3812a446fddb2e4146d106f05491 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 22:21:12 +0300 Subject: [PATCH 08/19] fixes --- superclient/agent/interceptor.py | 8 +++---- superclient/agent/metadata.py | 40 ++++++++++++++++++++++++++++++-- superclient/util/config.py | 31 ++++++++++++++++++++++++- 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index b3b2c25..c355f8d 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -5,7 +5,7 @@ from ..util.logger import get_logger from ..util.config import get_topics_list, is_disabled -from .metadata import fetch_metadata, optimal_cfg, _DEFAULTS +from .metadata import fetch_metadata_sync, optimal_cfg, _DEFAULTS from ..core.reporter import send_clients_msg from ..core.manager import normalize_bootstrap from .tracker import ProducerTracker, Heartbeat @@ -57,7 +57,7 @@ def init_patch(self, *args, **kwargs): try: # Get topics and metadata for optimization topics_env = get_topics_list() - metadata = fetch_metadata(bootstrap, orig_cfg, "kafka-python") + metadata = fetch_metadata_sync(bootstrap, orig_cfg, "kafka-python") # Check if Superstream is active for this cluster error_msg = "" @@ -185,7 +185,7 @@ def init_patch(self, *args, **kwargs): try: topics_env = get_topics_list() - metadata = fetch_metadata(bootstrap, orig_cfg, "aiokafka") + metadata = fetch_metadata_sync(bootstrap, orig_cfg, "aiokafka") error_msg = "" if metadata is None: error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) @@ -294,7 +294,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs): try: topics_env = get_topics_list() - metadata = fetch_metadata(bootstrap, conf, "confluent") + metadata = fetch_metadata_sync(bootstrap, conf, "confluent") error_msg = "" if metadata is None: error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index 5ff268e..d1d49f1 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -57,7 +57,7 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "confluent": _create_consumer_confluent, } -def fetch_metadata( +async def fetch_metadata( bootstrap: str, cfg: Dict[str, Any], lib_name: Literal["kafka-python", "aiokafka", "confluent"], @@ -103,7 +103,7 @@ def fetch_metadata( if msg and msg.value(): return json.loads(msg.value().decode()) - else: # kafka-python / aiokafka share similar API + elif lib_name == "kafka-python": import kafka as _kafka # type: ignore tp = _kafka.TopicPartition(topic, 0) @@ -126,10 +126,46 @@ def fetch_metadata( for batch in recs.values(): for rec in batch: return json.loads(rec.value.decode()) + + elif lib_name == "aiokafka": + # aiokafka uses its own TopicPartition and async API + from aiokafka import TopicPartition # type: ignore + + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + + # Get the end offset safely using aiokafka's API + end_offsets = await consumer.end_offsets([tp]) + end = end_offsets.get(tp, 0) + + if end == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) + consumer.close() + return None + + consumer.seek(tp, end - 1) + recs = await consumer.getmany(timeout_ms=5000) + consumer.close() + for batch in recs.values(): + for rec in batch: + return json.loads(rec.value.decode()) except Exception as exc: logger.error("[ERR-203] Failed to fetch metadata: {}", exc) return None +def fetch_metadata_sync( + bootstrap: str, + cfg: Dict[str, Any], + lib_name: Literal["kafka-python", "aiokafka", "confluent"], +) -> Optional[Dict[str, Any]]: + """Synchronous wrapper for fetch_metadata.""" + import asyncio + + # Run the async function synchronously for all libraries + return asyncio.run(fetch_metadata(bootstrap, cfg, lib_name)) + def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]: """Compute optimal configuration based on metadata and topics. diff --git a/superclient/util/config.py b/superclient/util/config.py index 12dbe8a..d0aeb4c 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -31,9 +31,31 @@ def get_topics_list() -> List[str]: def mask_sensitive(k: str, v: Any) -> Any: """Mask sensitive configuration values.""" sensitive_patterns = [ + # Existing patterns "password", "sasl.jaas.config", "basic.auth.user.info", "ssl.key", "ssl.cert", "ssl.truststore", "ssl.keystore", - "sasl.kerberos.keytab", "sasl.kerberos.principal" + "sasl.kerberos.keytab", "sasl.kerberos.principal", + + # Additional SSL properties + "ssl.cafile", "ssl.certfile", "ssl.keyfile", + "ssl.certificate.location", "ssl.certificate.pem", + "ssl.ca.location", "ssl.ca.pem", + "ssl.ca.certificate.stores", "ssl.crl.location", + "ssl.providers", "ssl.context", + + # Additional SASL properties + "sasl.username", "sasl.password", + "sasl.plain.username", "sasl.plain.password", + "sasl.oauthbearer.config", "sasl.oauthbearer.client.secret", + "sasl.oauthbearer.extensions", + + # OAuth callback configurations + "sasl.oauth.token.provider", "oauth_cb", "sasl_oauth_token_provider", + + # Library-specific variations + "sasl_plain_username", "sasl_plain_password", + "ssl_cafile", "ssl_certfile", "ssl_keyfile", + "sasl_oauth_token_provider" ] return "[MASKED]" if any(pattern in k.lower() for pattern in sensitive_patterns) else v @@ -129,6 +151,13 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any lib_key = _JAVA_TO_LIB_MAPPING[lib_name].get(java_key, java_key) if lib_key in src and lib_key not in dst: dst[lib_key] = src[lib_key] + + # Debug log to show config comparison + # Mask sensitive data before logging + src_masked = {k: mask_sensitive(k, v) for k, v in src.items()} + dst_masked = {k: mask_sensitive(k, v) for k, v in dst.items()} + + logger.debug("copy_client_configuration_properties - Source config: {}, Destination config: {}", src_masked, dst_masked) # --------------------------------------------------------------------------- # Field name mapping between Java-style and library-specific representations From c057d9407c9aa18e40421530a90bdb8df449b9b7 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 23:07:00 +0300 Subject: [PATCH 09/19] fixes --- superclient/agent/metadata.py | 213 ++++++++++++++++++++++------------ superclient/core/reporter.py | 24 +++- superclient/util/config.py | 1 + 3 files changed, 161 insertions(+), 77 deletions(-) diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index d1d49f1..784a84f 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -57,21 +57,63 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "confluent": _create_consumer_confluent, } -async def fetch_metadata( +def fetch_metadata_kafka_python( bootstrap: str, cfg: Dict[str, Any], - lib_name: Literal["kafka-python", "aiokafka", "confluent"], ) -> Optional[Dict[str, Any]]: - """Fetch the latest optimization metadata from the Superstream internal topic. + """Fetch metadata using kafka-python library.""" + + builder = _CONSUMER_BUILDERS.get("kafka-python") + if builder is None: + logger.error("[ERR-204] Unsupported Kafka library: kafka-python") + return None - The consumer is created using the *same* Kafka library that the application - itself employs (`lib_name`). This guarantees compatibility with user - dependencies and avoids version-related conflicts. - """ + topic = "superstream.metadata_v1" + try: + consumer = builder(bootstrap, cfg) + + if not consumer.partitions_for_topic(topic): + logger.error( + "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." + ) + consumer.close() + return None + + import kafka as _kafka # type: ignore - builder = _CONSUMER_BUILDERS.get(lib_name) + tp = _kafka.TopicPartition(topic, 0) + consumer.assign([tp]) + + # Get the end offset safely + end_offsets = consumer.end_offsets([tp]) + end = end_offsets.get(tp, 0) + + if end == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) + consumer.close() + return None + + consumer.seek(tp, end - 1) + recs = consumer.poll(timeout_ms=5000) + consumer.close() + for batch in recs.values(): + for rec in batch: + return json.loads(rec.value.decode()) + except Exception as exc: + logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + return None + +def fetch_metadata_confluent( + bootstrap: str, + cfg: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Fetch metadata using confluent-kafka library.""" + + builder = _CONSUMER_BUILDERS.get("confluent") if builder is None: - logger.error("[ERR-204] Unsupported Kafka library: {}", lib_name) + logger.error("[ERR-204] Unsupported Kafka library: confluent") return None topic = "superstream.metadata_v1" @@ -85,74 +127,84 @@ async def fetch_metadata( consumer.close() return None - if lib_name == "confluent": - from confluent_kafka import TopicPartition # type: ignore - - tp = TopicPartition(topic, 0) - consumer.assign([tp]) - low, high = consumer.get_watermark_offsets(tp, timeout=5.0) - if high == 0: - logger.error( - "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." - ) - consumer.close() - return None - consumer.seek(TopicPartition(topic, 0, high - 1)) - msg = consumer.poll(timeout=5.0) + from confluent_kafka import TopicPartition # type: ignore + + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + low, high = consumer.get_watermark_offsets(tp, timeout=5.0) + if high == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) consumer.close() - if msg and msg.value(): - return json.loads(msg.value().decode()) + return None + consumer.seek(TopicPartition(topic, 0, high - 1)) + msg = consumer.poll(timeout=5.0) + consumer.close() + if msg and msg.value(): + return json.loads(msg.value().decode()) + except Exception as exc: + logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + return None - elif lib_name == "kafka-python": - import kafka as _kafka # type: ignore +async def fetch_metadata_aiokafka( + bootstrap: str, + cfg: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Fetch metadata using aiokafka library.""" + + builder = _CONSUMER_BUILDERS.get("aiokafka") + if builder is None: + logger.error("[ERR-204] Unsupported Kafka library: aiokafka") + return None - tp = _kafka.TopicPartition(topic, 0) - consumer.assign([tp]) - - # Get the end offset safely - end_offsets = consumer.end_offsets([tp]) - end = end_offsets.get(tp, 0) - - if end == 0: - logger.error( - "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." - ) - consumer.close() - return None - - consumer.seek(tp, end - 1) - recs = consumer.poll(timeout_ms=5000) - consumer.close() - for batch in recs.values(): - for rec in batch: - return json.loads(rec.value.decode()) + topic = "superstream.metadata_v1" + consumer = None + try: + consumer = builder(bootstrap, cfg) + + # Start the consumer first + await consumer.start() + + # For aiokafka, partitions_for_topic returns a set, not a coroutine + partitions = consumer.partitions_for_topic(topic) + if not partitions: + logger.error( + "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." + ) + await consumer.stop() + return None - elif lib_name == "aiokafka": - # aiokafka uses its own TopicPartition and async API - from aiokafka import TopicPartition # type: ignore + # aiokafka uses its own TopicPartition and async API + from aiokafka import TopicPartition # type: ignore - tp = TopicPartition(topic, 0) - consumer.assign([tp]) - - # Get the end offset safely using aiokafka's API - end_offsets = await consumer.end_offsets([tp]) - end = end_offsets.get(tp, 0) + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + + # Get the end offset safely using aiokafka's API + end_offsets = await consumer.end_offsets([tp]) + end = end_offsets.get(tp, 0) + + if end == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) + await consumer.stop() + return None - if end == 0: - logger.error( - "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." - ) - consumer.close() - return None - - consumer.seek(tp, end - 1) - recs = await consumer.getmany(timeout_ms=5000) - consumer.close() - for batch in recs.values(): - for rec in batch: - return json.loads(rec.value.decode()) + consumer.seek(tp, end - 1) + recs = await consumer.getmany(timeout_ms=5000) + await consumer.stop() + for batch in recs.values(): + for rec in batch: + return json.loads(rec.value.decode()) except Exception as exc: logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + if consumer: + try: + await consumer.stop() + except: + pass return None def fetch_metadata_sync( @@ -163,8 +215,27 @@ def fetch_metadata_sync( """Synchronous wrapper for fetch_metadata.""" import asyncio - # Run the async function synchronously for all libraries - return asyncio.run(fetch_metadata(bootstrap, cfg, lib_name)) + if lib_name == "kafka-python": + return fetch_metadata_kafka_python(bootstrap, cfg) + elif lib_name == "confluent": + return fetch_metadata_confluent(bootstrap, cfg) + elif lib_name == "aiokafka": + # For aiokafka, we need to handle the async case + try: + # Try to get the current event loop + loop = asyncio.get_running_loop() + # If we get here, there's already a running event loop + # We need to create a new event loop in a separate thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, fetch_metadata_aiokafka(bootstrap, cfg)) + return future.result() + except RuntimeError: + # No event loop is running, we can use asyncio.run() + return asyncio.run(fetch_metadata_aiokafka(bootstrap, cfg)) + else: + logger.error("[ERR-204] Unsupported Kafka library: {}", lib_name) + return None def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]: """Compute optimal configuration based on metadata and topics. diff --git a/superclient/core/reporter.py b/superclient/core/reporter.py index 884b3b6..892b133 100644 --- a/superclient/core/reporter.py +++ b/superclient/core/reporter.py @@ -55,7 +55,7 @@ async def _create_producer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): "bootstrap_servers": bootstrap, "client_id": _SUPERLIB_PREFIX + "client-reporter", "compression_type": "zstd", - "batch_size": 16_384, + "max_batch_size": 16_384, "linger_ms": 1000, } copy_client_configuration_properties(base_cfg, cfg, "aiokafka") @@ -80,7 +80,19 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt try: # Handle aiokafka (async library) if lib_name == "aiokafka": - asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload)) + # Handle the case where an event loop is already running + try: + # Try to get the current event loop + loop = asyncio.get_running_loop() + # If we get here, there's already a running event loop + # We need to create a new event loop in a separate thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, internal_send_clients_async(bootstrap, base_cfg, payload)) + future.result() + except RuntimeError: + # No event loop is running, we can use asyncio.run() + asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload)) return # Handle kafka-python (sync library) @@ -98,8 +110,8 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt prod.flush() return - except Exception: - logger.debug("Failed to send clients message via {}", lib_name) + except Exception as e: + logger.error("Failed to send clients message via {}: {}", lib_name, e) async def internal_send_clients_async(bootstrap: str, base_cfg: Dict[str, Any], payload: bytes) -> None: @@ -111,8 +123,8 @@ async def internal_send_clients_async(bootstrap: str, base_cfg: Dict[str, Any], await prod.send_and_wait("superstream.clients", payload) finally: await prod.stop() - except Exception: - logger.debug("Failed to send clients message via aiokafka") + except Exception as e: + logger.error("Failed to send clients message via aiokafka: {}", e) def send_clients_msg(tracker: Any, error: str = "") -> None: diff --git a/superclient/util/config.py b/superclient/util/config.py index d0aeb4c..b9fb145 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -229,6 +229,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "enable.idempotence": "enable_idempotence", "security.protocol": "security_protocol", "sasl.mechanism": "sasl_mechanism", + "ssl.context": "ssl_context", }, "confluent": { # Confluent uses Java-style names directly, so most mappings are 1:1 From 384aa29c6917b83fb8733ea0e685ec26d553b5ee Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 23:22:58 +0300 Subject: [PATCH 10/19] aiokafka related fixes --- superclient/util/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/superclient/util/config.py b/superclient/util/config.py index b9fb145..36947c3 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -230,6 +230,8 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "security.protocol": "security_protocol", "sasl.mechanism": "sasl_mechanism", "ssl.context": "ssl_context", + "sasl.plain.username": "sasl_plain_username", + "sasl.plain.password": "sasl_plain_password", }, "confluent": { # Confluent uses Java-style names directly, so most mappings are 1:1 From cf4a9988fef5f9a847d5d83373b7ad8c2793149e Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 29 Jun 2025 23:52:55 +0300 Subject: [PATCH 11/19] confluent kafka related fixes --- superclient/agent/__init__.py | 7 +- superclient/agent/interceptor.py | 231 ++++++++++++++++++------------- superclient/agent/metadata.py | 59 +++++++- 3 files changed, 192 insertions(+), 105 deletions(-) diff --git a/superclient/agent/__init__.py b/superclient/agent/__init__.py index 0146c52..75b86fb 100644 --- a/superclient/agent/__init__.py +++ b/superclient/agent/__init__.py @@ -44,7 +44,12 @@ def _patch_module(module_name: str) -> None: # Check if Producer exists before patching confluent_module = sys.modules["confluent_kafka"] if hasattr(confluent_module, "Producer"): - patch_confluent(confluent_module) + # Additional check to ensure we can safely patch + try: + patch_confluent(confluent_module) + except Exception as patch_exc: + logger.error("[ERR-003] Failed to patch confluent_kafka Producer: {}", patch_exc) + # Don't re-raise, just log the error except Exception as exc: logger.error("[ERR-001] Failed to patch {}: {}", module_name, exc) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index c355f8d..6c2e832 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -272,106 +272,139 @@ def patch_confluent(mod): if _PATCHED.get("confluent"): return _PATCHED["confluent"] = True - Producer = mod.Producer - orig_init = Producer.__init__ - - def init_patch(self, conf: Dict[str, Any], *args, **kwargs): - if is_disabled(): - return orig_init(self, conf, *args, **kwargs) - conf = dict(conf) - - # Normalize compression type: convert None to "none" string - if "compression.type" in conf and conf["compression.type"] is None: - conf["compression.type"] = "none" + + # Check if Producer exists and is not already patched + if not hasattr(mod, "Producer"): + logger.warn("confluent_kafka module does not have Producer class") + return - bootstrap = conf.get("bootstrap.servers") - if not bootstrap: - return orig_init(self, conf, *args, **kwargs) - bootstrap = normalize_bootstrap(bootstrap) - client_id = conf.get("client.id", "") - if client_id.startswith(_SUPERLIB_PREFIX): - return orig_init(self, conf, *args, **kwargs) - - try: - topics_env = get_topics_list() - metadata = fetch_metadata_sync(bootstrap, conf, "confluent") - error_msg = "" - if metadata is None: - error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) - logger.error(error_msg) - # Skip optimization but keep stats reporting - opt_cfg = {} - elif not metadata.get("active", True): - error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." - logger.error(error_msg) - # Skip optimization but keep stats reporting - opt_cfg = {} - else: - # Get optimized configuration if Superstream is active - opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, conf, "confluent") - if warning_msg: - error_msg = warning_msg - for k, v in opt_cfg.items(): - current_val = conf.get(k) - if current_val != v: - if k in conf: - logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) - else: - logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) - conf[k] = v - report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS - tr = ProducerTracker( - lib="confluent", - producer=self, - bootstrap=bootstrap, - client_id=client_id, - orig_cfg=conf, - opt_cfg=opt_cfg, - report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS), - error=error_msg, # Store error message in tracker - metadata=metadata, - topics_env=topics_env, - ) - Heartbeat.register_tracker(tr) - orig_init(self, conf, *args, **kwargs) - if not hasattr(self, "_superstream_patch"): - original_produce = self.produce - - def produce_patch(topic, *a, **kw): - tr.record_topic(topic) - return original_produce(topic, *a, **kw) - - self.produce = produce_patch - orig_close = self.close - - def close_patch(*a, **kw): - if not hasattr(self, "_superstream_closed"): - self._superstream_closed = True - tr.close() - Heartbeat.unregister_tracker(tr.uuid) - return orig_close(*a, **kw) - - self.close = close_patch - self._superstream_patch = True - send_clients_msg(tr, error_msg) + Producer = mod.Producer + + # Check if already patched + if hasattr(mod, '_OriginalProducer'): + logger.debug("confluent_kafka Producer already patched") + return + + # Store the original Producer class + mod._OriginalProducer = Producer + + # Create a wrapper class that provides the same interface as Producer + class SuperstreamProducer: + def __init__(self, conf: Dict[str, Any], *args, **kwargs): + if is_disabled(): + self._producer = Producer(conf, *args, **kwargs) + return + + conf = dict(conf) - # Log success message based on whether defaults were used - if not opt_cfg: # No optimization applied - pass # Skip success message as there was an error - elif all(opt_cfg.get(k) == v for k, v in _DEFAULTS.items()) and len(opt_cfg) == len(_DEFAULTS): # Default optimization - if client_id: - logger.info("Successfully optimized producer with default optimization parameters for {}", client_id) - else: - logger.info("Successfully optimized producer with default optimization parameters") - else: # Custom optimization - if client_id: - logger.info("Successfully optimized producer configuration for {}", client_id) + # Normalize compression type: convert None to "none" string + if "compression.type" in conf and conf["compression.type"] is None: + conf["compression.type"] = "none" + + bootstrap = conf.get("bootstrap.servers") + if not bootstrap: + self._producer = Producer(conf, *args, **kwargs) + return + bootstrap = normalize_bootstrap(bootstrap) + client_id = conf.get("client.id", "") + if client_id.startswith(_SUPERLIB_PREFIX): + self._producer = Producer(conf, *args, **kwargs) + return + + try: + topics_env = get_topics_list() + metadata = fetch_metadata_sync(bootstrap, conf, "confluent") + error_msg = "" + if metadata is None: + error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) + logger.error(error_msg) + # Skip optimization but keep stats reporting + opt_cfg = {} + elif not metadata.get("active", True): + error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." + logger.error(error_msg) + # Skip optimization but keep stats reporting + opt_cfg = {} else: - logger.info("Successfully optimized producer configuration") - - except Exception as e: - # If any error occurs in our logic, log it and create the producer normally - logger.error("[ERR-303] Failed to optimize producer with client id {}: {}", client_id, str(e)) - return orig_init(self, conf, *args, **kwargs) + # Get optimized configuration if Superstream is active + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, conf, "confluent") + if warning_msg: + error_msg = warning_msg + + # Apply optimizations to the configuration + for k, v in opt_cfg.items(): + current_val = conf.get(k) + if current_val != v: + if k in conf: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) + conf[k] = v + + # Create the producer with optimized configuration + self._producer = Producer(conf, *args, **kwargs) + + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS + self._tracker = ProducerTracker( + lib="confluent", + producer=self._producer, + bootstrap=bootstrap, + client_id=client_id, + orig_cfg=conf, + opt_cfg=opt_cfg, + report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS), + error=error_msg, # Store error message in tracker + metadata=metadata, + topics_env=topics_env, + ) + Heartbeat.register_tracker(self._tracker) + + send_clients_msg(self._tracker, error_msg) + + # Log success message based on whether defaults were used + if not opt_cfg: # No optimization applied + pass # Skip success message as there was an error + elif all(opt_cfg.get(k) == v for k, v in _DEFAULTS.items()) and len(opt_cfg) == len(_DEFAULTS): # Default optimization + if client_id: + logger.info("Successfully optimized producer with default optimization parameters for {}", client_id) + else: + logger.info("Successfully optimized producer with default optimization parameters") + else: # Custom optimization + if client_id: + logger.info("Successfully optimized producer configuration for {}", client_id) + else: + logger.info("Successfully optimized producer configuration") - Producer.__init__ = init_patch \ No newline at end of file + except Exception as e: + # If any error occurs in our logic, log it and create the producer normally + logger.error("[ERR-303] Failed to optimize producer with client id {}: {}", client_id, str(e)) + self._producer = Producer(conf, *args, **kwargs) + + def produce(self, topic, *args, **kwargs): + """Wrapper for produce method that tracks topics.""" + if hasattr(self, '_tracker'): + self._tracker.record_topic(topic) + return self._producer.produce(topic, *args, **kwargs) + + def poll(self, *args, **kwargs): + """Wrapper for poll method.""" + return self._producer.poll(*args, **kwargs) + + def flush(self, *args, **kwargs): + """Wrapper for flush method.""" + return self._producer.flush(*args, **kwargs) + + def close(self, *args, **kwargs): + """Wrapper for close method that handles cleanup.""" + if hasattr(self, '_tracker') and not hasattr(self, '_superstream_closed'): + self._superstream_closed = True + self._tracker.close() + Heartbeat.unregister_tracker(self._tracker.uuid) + return self._producer.close(*args, **kwargs) + + def __getattr__(self, name): + """Delegate all other attributes to the underlying producer.""" + return getattr(self._producer, name) + + # Replace the Producer class in the module + mod.Producer = SuperstreamProducer \ No newline at end of file diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index 784a84f..4aa0210 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -3,6 +3,7 @@ import json import os from typing import Any, Dict, Optional, Literal +import time from ..util.logger import get_logger from ..util.config import copy_client_configuration_properties, translate_java_to_lib @@ -47,6 +48,11 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "client.id": "superstreamlib-metadata-consumer", "enable.auto.commit": False, "auto.offset.reset": "earliest", + "group.id": "superstreamlib-metadata-consumer", + "session.timeout.ms": 10000, + "heartbeat.interval.ms": 3000, + "max.poll.interval.ms": 30000, + "enable.partition.eof": True, } copy_client_configuration_properties(base_cfg, consumer_cfg, "confluent") return Consumer(consumer_cfg) @@ -117,34 +123,77 @@ def fetch_metadata_confluent( return None topic = "superstream.metadata_v1" + consumer = None try: consumer = builder(bootstrap, cfg) - if not consumer.partitions_for_topic(topic): + from confluent_kafka import TopicPartition # type: ignore + + # First, try to get metadata to check if topic exists + try: + # Get topic metadata to check if topic exists + metadata = consumer.list_topics(topic=topic, timeout=5.0) + if topic not in metadata.topics: + logger.error( + "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." + ) + consumer.close() + return None + except Exception: logger.error( "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." ) consumer.close() return None - from confluent_kafka import TopicPartition # type: ignore - + # Assign to partition 0 tp = TopicPartition(topic, 0) consumer.assign([tp]) - low, high = consumer.get_watermark_offsets(tp, timeout=5.0) + + # Poll to ensure assignment is complete and add a small delay + consumer.poll(timeout=1.0) + time.sleep(0.1) # Small delay to ensure consumer is ready + + # Get watermark offsets to find the last message + try: + low, high = consumer.get_watermark_offsets(tp, timeout=5.0) + except Exception as e: + logger.error("[ERR-203] Failed to get watermark offsets: {}", e) + consumer.close() + return None + if high == 0: logger.error( "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." ) consumer.close() return None - consumer.seek(TopicPartition(topic, 0, high - 1)) + + # Seek to the last message (high - 1) + try: + consumer.seek(TopicPartition(topic, 0, high - 1)) + except Exception as e: + logger.error("[ERR-203] Failed to seek to last message: {}", e) + consumer.close() + return None + + # Poll for the message msg = consumer.poll(timeout=5.0) consumer.close() + if msg and msg.value(): return json.loads(msg.value().decode()) + else: + logger.error("[ERR-202] No message found at last offset") + return None + except Exception as exc: logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + if consumer: + try: + consumer.close() + except: + pass return None async def fetch_metadata_aiokafka( From 435da724ad6809acc3cec1d94dd20f882ebcfe47 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Mon, 30 Jun 2025 12:39:08 +0300 Subject: [PATCH 12/19] fixes --- superclient/agent/interceptor.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index 6c2e832..12edaab 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -89,7 +89,6 @@ def init_patch(self, *args, **kwargs): # Set up reporting interval report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS - # Create and register producer tracker tr = ProducerTracker( lib="kafka-python", @@ -123,6 +122,7 @@ def close_patch(*a, **kw): self._superstream_closed = True tr.close() Heartbeat.unregister_tracker(tr.uuid) + logger.debug("Superstream tracking stopped for kafka-python producer with client_id: {}", client_id) return orig_close(*a, **kw) self.close = close_patch @@ -239,6 +239,7 @@ async def stop_patch(*a, **kw): self._superstream_closed = True tr.close() Heartbeat.unregister_tracker(tr.uuid) + logger.debug("Superstream tracking stopped for aiokafka producer with client_id: {}", client_id) await original_stop(*a, **kw) self.stop = stop_patch @@ -386,21 +387,19 @@ def produce(self, topic, *args, **kwargs): self._tracker.record_topic(topic) return self._producer.produce(topic, *args, **kwargs) - def poll(self, *args, **kwargs): - """Wrapper for poll method.""" - return self._producer.poll(*args, **kwargs) - - def flush(self, *args, **kwargs): - """Wrapper for flush method.""" - return self._producer.flush(*args, **kwargs) - - def close(self, *args, **kwargs): - """Wrapper for close method that handles cleanup.""" + def __del__(self): + """Destructor to automatically clean up when producer is garbage collected.""" if hasattr(self, '_tracker') and not hasattr(self, '_superstream_closed'): - self._superstream_closed = True - self._tracker.close() - Heartbeat.unregister_tracker(self._tracker.uuid) - return self._producer.close(*args, **kwargs) + try: + self._superstream_closed = True + self._tracker.close() + Heartbeat.unregister_tracker(self._tracker.uuid) + logger.debug("Superstream tracking stopped for confluent-kafka producer with client_id: {}", + getattr(self._tracker, 'client_id', 'unknown')) + except Exception as e: + logger.error("Error during automatic cleanup: {}", e) + else: + logger.debug("Producer already cleaned up or no tracker found") def __getattr__(self, name): """Delegate all other attributes to the underlying producer.""" From dea787c4017dd08ccd44a5850b03bb0dc779c502 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Mon, 30 Jun 2025 15:16:04 +0300 Subject: [PATCH 13/19] fixes --- superclient/agent/metadata.py | 20 ++++++++++++-------- superclient/util/config.py | 1 + 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index 4aa0210..eff4c81 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -129,9 +129,16 @@ def fetch_metadata_confluent( from confluent_kafka import TopicPartition # type: ignore - # First, try to get metadata to check if topic exists + # Warm up the connection before attempting list_topics + logger.debug("Warming up connection with initial poll...") + try: + consumer.poll(timeout=2.0) + logger.debug("Connection warm-up completed") + except Exception as e: + logger.debug("Connection warm-up failed, continuing anyway: {}", e) + + # Try to get topic metadata, but don't fail if it doesn't work try: - # Get topic metadata to check if topic exists metadata = consumer.list_topics(topic=topic, timeout=5.0) if topic not in metadata.topics: logger.error( @@ -139,12 +146,9 @@ def fetch_metadata_confluent( ) consumer.close() return None - except Exception: - logger.error( - "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." - ) - consumer.close() - return None + except Exception as e: + logger.debug("Could not list topics, proceeding with assignment: {}", e) + # Continue with topic assignment even if list_topics fails # Assign to partition 0 tp = TopicPartition(topic, 0) diff --git a/superclient/util/config.py b/superclient/util/config.py index 36947c3..e2c05dc 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -268,6 +268,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "message.send.max.retries": "message.send.max.retries", "batch.num.messages": "batch.num.messages", "compression.codec": "compression.codec", + "oauth_cb": "oauth_cb", } } From 851dae0050a00452311f8ebcef65f614843faffe Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Thu, 3 Jul 2025 21:07:58 +0300 Subject: [PATCH 14/19] fixes --- superclient/agent/interceptor.py | 5 ++++- superclient/util/config.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index 12edaab..b796a4d 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -332,6 +332,9 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs): if warning_msg: error_msg = warning_msg + # Store original configuration before applying optimizations + orig_cfg = dict(conf) + # Apply optimizations to the configuration for k, v in opt_cfg.items(): current_val = conf.get(k) @@ -351,7 +354,7 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs): producer=self._producer, bootstrap=bootstrap, client_id=client_id, - orig_cfg=conf, + orig_cfg=orig_cfg, opt_cfg=opt_cfg, report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS), error=error_msg, # Store error message in tracker diff --git a/superclient/util/config.py b/superclient/util/config.py index e2c05dc..a6c4ad9 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -100,6 +100,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "enable.ssl.certificate.verification", "ssl.certificate.location", "ssl.certificate.pem", "ssl.ca.location", "ssl.ca.pem", + "ssl.key.location", "ssl.ca.certificate.stores", "ssl.crl.location", "ssl.providers", "ssl.context", "ssl.cafile", "ssl.certfile", "ssl.keyfile", @@ -269,6 +270,16 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "batch.num.messages": "batch.num.messages", "compression.codec": "compression.codec", "oauth_cb": "oauth_cb", + + # SSL properties (confluent uses Java-style names directly) + "ssl.ca.location": "ssl.ca.location", + "ssl.certificate.location": "ssl.certificate.location", + "ssl.key.location": "ssl.key.location", + "ssl.keystore.location": "ssl.keystore.location", + "ssl.keystore.password": "ssl.keystore.password", + "ssl.key.password": "ssl.key.password", + "ssl.endpoint.identification.algorithm": "ssl.endpoint.identification.algorithm", + "enable.ssl.certificate.verification": "enable.ssl.certificate.verification", } } From 7271f3e02465adc4d678c4fe4153416491fdebb5 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Fri, 4 Jul 2025 14:20:53 +0300 Subject: [PATCH 15/19] version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c5318ab..1124066 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "0.1.1" +version = "0.1.2" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0" From 86935c7053f2c279b4ddf9c0211291798adb9c08 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Fri, 4 Jul 2025 14:23:39 +0300 Subject: [PATCH 16/19] version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1124066..bb17cec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "0.1.2" +version = "0.1.3" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0" From 4031d4ec6647b3ff4be7c8d8337c94f4a8ce58d9 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Fri, 4 Jul 2025 14:46:17 +0300 Subject: [PATCH 17/19] fixes --- .gitignore | 1 - DEVELOPMENT.md | 2 +- README.md | 4 ++-- pyproject.toml | 2 +- superclient/__init__.py | 2 +- 5 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 1bd2fa5..8116c50 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ venv/ __pycache__/ *.py[cod] *$py.class -superclient.egg-info build/ dist/ superstream_clients.egg-info/ \ No newline at end of file diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 4aa53c4..be9ea33 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -42,7 +42,7 @@ This installs the package in "editable" mode and enables automatic loading, whic ## Uninstallation ```bash -pip uninstall superclient && find venv/lib/python*/site-packages -name "superclient-init.pth" -delete && rm -rf build/ dist/ superclient.egg-info/ && find . -name "*.pyc" -delete && find . -name "__pycache__" -type d -exec rm -rf {} + +pip uninstall superstream-clients && find venv/lib/python*/site-packages -name "superclient-init.pth" -delete && rm -rf build/ dist/ superstream_clients.egg-info/ && find . -name "*.pyc" -delete && find . -name "__pycache__" -type d -exec rm -rf {} + ``` This single command: diff --git a/README.md b/README.md index bdeb03a..fa132b5 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ The Superstream library needs to modify your producer's configuration to apply o ## Installation ```bash -pip install superclient && python -m superclient install_pth +pip install superstream-clients && python -m superclient install_pth ``` That's it! Superclient will now automatically load and optimize all Kafka producers in your Python environment. @@ -114,7 +114,7 @@ When using Superstream Clients with containerized applications, include the pack FROM python:3.8-slim # Install superclient -RUN pip install superclient +RUN pip install superstream-clients RUN python -m superclient install_pth # Your application code diff --git a/pyproject.toml b/pyproject.toml index bb17cec..87203aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "0.1.3" +version = "0.1.4" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0" diff --git a/superclient/__init__.py b/superclient/__init__.py index 755aa99..4362346 100644 --- a/superclient/__init__.py +++ b/superclient/__init__.py @@ -1,7 +1,7 @@ from importlib.metadata import PackageNotFoundError, version as _pkg_version try: - __version__ = _pkg_version("superclient") + __version__ = _pkg_version("superstream-clients") except PackageNotFoundError: # Fallback for when the package isn't installed (e.g. running from source without editable install) __version__ = "0.0.0" From 9f363d15e955c96db960220e4fbe585bf2fc3412 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Fri, 4 Jul 2025 15:39:46 +0300 Subject: [PATCH 18/19] add empty metrics fields --- superclient/agent/tracker.py | 1 + superclient/core/reporter.py | 4 ++++ superclient/model/messages.py | 6 +++++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/superclient/agent/tracker.py b/superclient/agent/tracker.py index 498ee2e..367701c 100644 --- a/superclient/agent/tracker.py +++ b/superclient/agent/tracker.py @@ -39,6 +39,7 @@ def __init__( self.error = error self.metadata = metadata self.topics_env = topics_env or [] + self.start_time_ms = int(time.time() * 1000) # Unix timestamp in milliseconds def record_topic(self, topic: str): """Record a topic that this producer writes to.""" diff --git a/superclient/core/reporter.py b/superclient/core/reporter.py index 892b133..13a1261 100644 --- a/superclient/core/reporter.py +++ b/superclient/core/reporter.py @@ -150,6 +150,10 @@ def send_clients_msg(tracker: Any, error: str = "") -> None: most_impactful_topic=tracker.determine_topic(), language=f"Python ({tracker.library})", error=error, + producer_metrics={}, + topic_metrics={}, + node_metrics={}, + app_info_metrics={"start-time-ms": str(tracker.start_time_ms)}, ) payload = json.dumps(msg.__dict__).encode() internal_send_clients(tracker.bootstrap, tracker.orig_cfg, payload, tracker.library) diff --git a/superclient/model/messages.py b/superclient/model/messages.py index 7c254eb..9483ad3 100644 --- a/superclient/model/messages.py +++ b/superclient/model/messages.py @@ -35,4 +35,8 @@ class ClientMessage: superstream_client_uid: str = "" most_impactful_topic: str = "" language: str = "" - error: str = "" \ No newline at end of file + error: str = "" + producer_metrics: Dict[str, Any] = field(default_factory=dict) + topic_metrics: Dict[str, Any] = field(default_factory=dict) + node_metrics: Dict[str, Any] = field(default_factory=dict) + app_info_metrics: Dict[str, Any] = field(default_factory=dict) \ No newline at end of file From b6eb656f01cf69d4a1bcf825cfadf5f3b4b93d6e Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Fri, 4 Jul 2025 15:40:11 +0300 Subject: [PATCH 19/19] version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 87203aa..24b23db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "0.1.4" +version = "0.1.5" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0"