diff --git a/.gitignore b/.gitignore index d3834f4..8116c50 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,6 @@ venv/ __pycache__/ *.py[cod] *$py.class -superclient.egg-info build/ -dist/ \ No newline at end of file +dist/ +superstream_clients.egg-info/ \ No newline at end of file diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 4aa53c4..be9ea33 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -42,7 +42,7 @@ This installs the package in "editable" mode and enables automatic loading, whic ## Uninstallation ```bash -pip uninstall superclient && find venv/lib/python*/site-packages -name "superclient-init.pth" -delete && rm -rf build/ dist/ superclient.egg-info/ && find . -name "*.pyc" -delete && find . -name "__pycache__" -type d -exec rm -rf {} + +pip uninstall superstream-clients && find venv/lib/python*/site-packages -name "superclient-init.pth" -delete && rm -rf build/ dist/ superstream_clients.egg-info/ && find . -name "*.pyc" -delete && find . -name "__pycache__" -type d -exec rm -rf {} + ``` This single command: diff --git a/README.md b/README.md index bdeb03a..fa132b5 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ The Superstream library needs to modify your producer's configuration to apply o ## Installation ```bash -pip install superclient && python -m superclient install_pth +pip install superstream-clients && python -m superclient install_pth ``` That's it! Superclient will now automatically load and optimize all Kafka producers in your Python environment. @@ -114,7 +114,7 @@ When using Superstream Clients with containerized applications, include the pack FROM python:3.8-slim # Install superclient -RUN pip install superclient +RUN pip install superstream-clients RUN python -m superclient install_pth # Your application code diff --git a/examples/requirements.txt b/examples/requirements.txt index d583425..e612393 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -1,4 +1,4 @@ -kafka-python==2.0.2 +kafka-python==2.2.14 confluent-kafka==2.3.0 aiokafka==0.10.0 aws-msk-iam-sasl-signer-python==1.0.2 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0b2cc2b..24b23db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "0.1.0" +version = "0.1.5" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0" diff --git a/superclient/__init__.py b/superclient/__init__.py index 755aa99..4362346 100644 --- a/superclient/__init__.py +++ b/superclient/__init__.py @@ -1,7 +1,7 @@ from importlib.metadata import PackageNotFoundError, version as _pkg_version try: - __version__ = _pkg_version("superclient") + __version__ = _pkg_version("superstream-clients") except PackageNotFoundError: # Fallback for when the package isn't installed (e.g. running from source without editable install) __version__ = "0.0.0" diff --git a/superclient/agent/__init__.py b/superclient/agent/__init__.py index c5aec99..75b86fb 100644 --- a/superclient/agent/__init__.py +++ b/superclient/agent/__init__.py @@ -19,9 +19,6 @@ set_debug_enabled(True) logger = get_logger("agent") -logger.info("Superstream Agent initialized with environment variables: {}", _ENV_VARS) -if is_disabled(): - logger.warn("Superstream functionality disabled via SUPERSTREAM_DISABLED") # Preserve reference to built-in import function _original_import = builtins.__import__ @@ -47,7 +44,12 @@ def _patch_module(module_name: str) -> None: # Check if Producer exists before patching confluent_module = sys.modules["confluent_kafka"] if hasattr(confluent_module, "Producer"): - patch_confluent(confluent_module) + # Additional check to ensure we can safely patch + try: + patch_confluent(confluent_module) + except Exception as patch_exc: + logger.error("[ERR-003] Failed to patch confluent_kafka Producer: {}", patch_exc) + # Don't re-raise, just log the error except Exception as exc: logger.error("[ERR-001] Failed to patch {}: {}", module_name, exc) @@ -93,6 +95,12 @@ def initialize(): 2. Schedules patching of any pre-imported modules 3. Starts the heartbeat thread """ + + # Log initialization message + logger.info("Superstream Agent initialized with environment variables: {}", _ENV_VARS) + if is_disabled(): + logger.warn("Superstream functionality disabled via SUPERSTREAM_DISABLED") + if is_disabled(): return diff --git a/superclient/agent/interceptor.py b/superclient/agent/interceptor.py index 62faf17..b796a4d 100644 --- a/superclient/agent/interceptor.py +++ b/superclient/agent/interceptor.py @@ -5,7 +5,7 @@ from ..util.logger import get_logger from ..util.config import get_topics_list, is_disabled -from .metadata import fetch_metadata, optimal_cfg, _DEFAULTS +from .metadata import fetch_metadata_sync, optimal_cfg, _DEFAULTS from ..core.reporter import send_clients_msg from ..core.manager import normalize_bootstrap from .tracker import ProducerTracker, Heartbeat @@ -37,6 +37,10 @@ def init_patch(self, *args, **kwargs): # Store original configuration orig_cfg = dict(kwargs) + # Normalize compression type: convert None to "none" string + if "compression_type" in orig_cfg and orig_cfg["compression_type"] is None: + orig_cfg["compression_type"] = "none" + # Get bootstrap servers from args or kwargs bootstrap = orig_cfg.get("bootstrap_servers") or (args[0] if args else None) if not bootstrap: @@ -53,7 +57,7 @@ def init_patch(self, *args, **kwargs): try: # Get topics and metadata for optimization topics_env = get_topics_list() - metadata = fetch_metadata(bootstrap, orig_cfg, "kafka-python") + metadata = fetch_metadata_sync(bootstrap, orig_cfg, "kafka-python") # Check if Superstream is active for this cluster error_msg = "" @@ -62,25 +66,29 @@ def init_patch(self, *args, **kwargs): error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) # Skip optimization but keep stats reporting opt_cfg = {} - elif not metadata.active: + elif not metadata.get("active", True): error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." logger.error(error_msg) # Skip optimization but keep stats reporting opt_cfg = {} else: # Get optimized configuration if Superstream is active - opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python") + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python") + if warning_msg: + error_msg = warning_msg # Apply optimized configuration for k, v in opt_cfg.items(): - snake = k.replace(".", "_") - if kwargs.get(snake) != v: - logger.debug("Overriding configuration: {} -> {}", snake, v) - kwargs[snake] = v + current_val = kwargs.get(k) + if current_val != v: + if k in kwargs: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) + kwargs[k] = v # Set up reporting interval - report_interval = metadata.report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS - + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS # Create and register producer tracker tr = ProducerTracker( lib="kafka-python", @@ -96,9 +104,6 @@ def init_patch(self, *args, **kwargs): ) Heartbeat.register_tracker(tr) - # Initialize the producer with original configuration - orig_init(self, *args, **kwargs) - # Patch send and close methods if not already patched if not hasattr(self, "_superstream_patch"): original_send = self.send @@ -117,12 +122,13 @@ def close_patch(*a, **kw): self._superstream_closed = True tr.close() Heartbeat.unregister_tracker(tr.uuid) + logger.debug("Superstream tracking stopped for kafka-python producer with client_id: {}", client_id) return orig_close(*a, **kw) self.close = close_patch self._superstream_patch = True - # Initialize again with optimized configuration + # Initialize with optimized configuration orig_init(self, *args, **kwargs) # Send client registration message @@ -162,6 +168,11 @@ def init_patch(self, *args, **kwargs): if is_disabled(): return orig_init(self, *args, **kwargs) orig_cfg = dict(kwargs) + + # Normalize compression type: convert None to "none" string + if "compression_type" in orig_cfg and orig_cfg["compression_type"] is None: + orig_cfg["compression_type"] = "none" + bootstrap = orig_cfg.get("bootstrap_servers") if not bootstrap and args: bootstrap = args[0] @@ -174,26 +185,32 @@ def init_patch(self, *args, **kwargs): try: topics_env = get_topics_list() - metadata = fetch_metadata(bootstrap, orig_cfg, "aiokafka") + metadata = fetch_metadata_sync(bootstrap, orig_cfg, "aiokafka") error_msg = "" if metadata is None: error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) logger.error(error_msg) # Skip optimization but keep stats reporting opt_cfg = {} - elif not metadata.active: + elif not metadata.get("active", True): error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." logger.error(error_msg) # Skip optimization but keep stats reporting opt_cfg = {} else: # Get optimized configuration if Superstream is active - opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka") + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka") + if warning_msg: + error_msg = warning_msg for k, v in opt_cfg.items(): - if kwargs.get(k) != v: - logger.debug("Overriding configuration: {} -> {}", k, v) + current_val = kwargs.get(k) + if current_val != v: + if k in kwargs: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) kwargs[k] = v - report_interval = metadata.report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS tr = ProducerTracker( lib="aiokafka", producer=self, @@ -207,7 +224,6 @@ def init_patch(self, *args, **kwargs): topics_env=topics_env, ) Heartbeat.register_tracker(tr) - orig_init(self, *args, **kwargs) if not hasattr(self, "_superstream_patch"): original_send = self.send @@ -223,6 +239,7 @@ async def stop_patch(*a, **kw): self._superstream_closed = True tr.close() Heartbeat.unregister_tracker(tr.uuid) + logger.debug("Superstream tracking stopped for aiokafka producer with client_id: {}", client_id) await original_stop(*a, **kw) self.stop = stop_patch @@ -256,95 +273,140 @@ def patch_confluent(mod): if _PATCHED.get("confluent"): return _PATCHED["confluent"] = True + + # Check if Producer exists and is not already patched + if not hasattr(mod, "Producer"): + logger.warn("confluent_kafka module does not have Producer class") + return + Producer = mod.Producer - orig_init = Producer.__init__ - - def init_patch(self, conf: Dict[str, Any], *args, **kwargs): - if is_disabled(): - return orig_init(self, conf, *args, **kwargs) - conf = dict(conf) - bootstrap = conf.get("bootstrap.servers") - if not bootstrap: - return orig_init(self, conf, *args, **kwargs) - bootstrap = normalize_bootstrap(bootstrap) - client_id = conf.get("client.id", "") - if client_id.startswith(_SUPERLIB_PREFIX): - return orig_init(self, conf, *args, **kwargs) - - try: - topics_env = get_topics_list() - metadata = fetch_metadata(bootstrap, conf, "confluent") - error_msg = "" - if metadata is None: - error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) - logger.error(error_msg) - # Skip optimization but keep stats reporting - opt_cfg = {} - elif not metadata.active: - error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." - logger.error(error_msg) - # Skip optimization but keep stats reporting - opt_cfg = {} - else: - # Get optimized configuration if Superstream is active - opt_cfg = optimal_cfg(metadata, topics_env, conf, "confluent") - for k, v in opt_cfg.items(): - if conf.get(k) != v: - logger.debug("Overriding configuration: {} -> {}", k, v) - conf[k] = v - report_interval = metadata.report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS - tr = ProducerTracker( - lib="confluent", - producer=self, - bootstrap=bootstrap, - client_id=client_id, - orig_cfg=conf, - opt_cfg=opt_cfg, - report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS), - error=error_msg, # Store error message in tracker - metadata=metadata, - topics_env=topics_env, - ) - Heartbeat.register_tracker(tr) - orig_init(self, conf, *args, **kwargs) - if not hasattr(self, "_superstream_patch"): - original_produce = self.produce - - def produce_patch(topic, *a, **kw): - tr.record_topic(topic) - return original_produce(topic, *a, **kw) - - self.produce = produce_patch - orig_close = self.close - - def close_patch(*a, **kw): - if not hasattr(self, "_superstream_closed"): - self._superstream_closed = True - tr.close() - Heartbeat.unregister_tracker(tr.uuid) - return orig_close(*a, **kw) - - self.close = close_patch - self._superstream_patch = True - send_clients_msg(tr, error_msg) + + # Check if already patched + if hasattr(mod, '_OriginalProducer'): + logger.debug("confluent_kafka Producer already patched") + return + + # Store the original Producer class + mod._OriginalProducer = Producer + + # Create a wrapper class that provides the same interface as Producer + class SuperstreamProducer: + def __init__(self, conf: Dict[str, Any], *args, **kwargs): + if is_disabled(): + self._producer = Producer(conf, *args, **kwargs) + return + + conf = dict(conf) - # Log success message based on whether defaults were used - if not opt_cfg: # No optimization applied - pass # Skip success message as there was an error - elif all(opt_cfg.get(k) == v for k, v in _DEFAULTS.items()) and len(opt_cfg) == len(_DEFAULTS): # Default optimization - if client_id: - logger.info("Successfully optimized producer with default optimization parameters for {}", client_id) - else: - logger.info("Successfully optimized producer with default optimization parameters") - else: # Custom optimization - if client_id: - logger.info("Successfully optimized producer configuration for {}", client_id) + # Normalize compression type: convert None to "none" string + if "compression.type" in conf and conf["compression.type"] is None: + conf["compression.type"] = "none" + + bootstrap = conf.get("bootstrap.servers") + if not bootstrap: + self._producer = Producer(conf, *args, **kwargs) + return + bootstrap = normalize_bootstrap(bootstrap) + client_id = conf.get("client.id", "") + if client_id.startswith(_SUPERLIB_PREFIX): + self._producer = Producer(conf, *args, **kwargs) + return + + try: + topics_env = get_topics_list() + metadata = fetch_metadata_sync(bootstrap, conf, "confluent") + error_msg = "" + if metadata is None: + error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id) + logger.error(error_msg) + # Skip optimization but keep stats reporting + opt_cfg = {} + elif not metadata.get("active", True): + error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it." + logger.error(error_msg) + # Skip optimization but keep stats reporting + opt_cfg = {} else: - logger.info("Successfully optimized producer configuration") - - except Exception as e: - # If any error occurs in our logic, log it and create the producer normally - logger.error("[ERR-303] Failed to optimize producer with client id {}: {}", client_id, str(e)) - return orig_init(self, conf, *args, **kwargs) - - Producer.__init__ = init_patch \ No newline at end of file + # Get optimized configuration if Superstream is active + opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, conf, "confluent") + if warning_msg: + error_msg = warning_msg + + # Store original configuration before applying optimizations + orig_cfg = dict(conf) + + # Apply optimizations to the configuration + for k, v in opt_cfg.items(): + current_val = conf.get(k) + if current_val != v: + if k in conf: + logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v) + else: + logger.debug("Overriding configuration: {} ((not set) -> {})", k, v) + conf[k] = v + + # Create the producer with optimized configuration + self._producer = Producer(conf, *args, **kwargs) + + report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS + self._tracker = ProducerTracker( + lib="confluent", + producer=self._producer, + bootstrap=bootstrap, + client_id=client_id, + orig_cfg=orig_cfg, + opt_cfg=opt_cfg, + report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS), + error=error_msg, # Store error message in tracker + metadata=metadata, + topics_env=topics_env, + ) + Heartbeat.register_tracker(self._tracker) + + send_clients_msg(self._tracker, error_msg) + + # Log success message based on whether defaults were used + if not opt_cfg: # No optimization applied + pass # Skip success message as there was an error + elif all(opt_cfg.get(k) == v for k, v in _DEFAULTS.items()) and len(opt_cfg) == len(_DEFAULTS): # Default optimization + if client_id: + logger.info("Successfully optimized producer with default optimization parameters for {}", client_id) + else: + logger.info("Successfully optimized producer with default optimization parameters") + else: # Custom optimization + if client_id: + logger.info("Successfully optimized producer configuration for {}", client_id) + else: + logger.info("Successfully optimized producer configuration") + + except Exception as e: + # If any error occurs in our logic, log it and create the producer normally + logger.error("[ERR-303] Failed to optimize producer with client id {}: {}", client_id, str(e)) + self._producer = Producer(conf, *args, **kwargs) + + def produce(self, topic, *args, **kwargs): + """Wrapper for produce method that tracks topics.""" + if hasattr(self, '_tracker'): + self._tracker.record_topic(topic) + return self._producer.produce(topic, *args, **kwargs) + + def __del__(self): + """Destructor to automatically clean up when producer is garbage collected.""" + if hasattr(self, '_tracker') and not hasattr(self, '_superstream_closed'): + try: + self._superstream_closed = True + self._tracker.close() + Heartbeat.unregister_tracker(self._tracker.uuid) + logger.debug("Superstream tracking stopped for confluent-kafka producer with client_id: {}", + getattr(self._tracker, 'client_id', 'unknown')) + except Exception as e: + logger.error("Error during automatic cleanup: {}", e) + else: + logger.debug("Producer already cleaned up or no tracker found") + + def __getattr__(self, name): + """Delegate all other attributes to the underlying producer.""" + return getattr(self._producer, name) + + # Replace the Producer class in the module + mod.Producer = SuperstreamProducer \ No newline at end of file diff --git a/superclient/agent/metadata.py b/superclient/agent/metadata.py index 6bc8b47..eff4c81 100644 --- a/superclient/agent/metadata.py +++ b/superclient/agent/metadata.py @@ -3,6 +3,7 @@ import json import os from typing import Any, Dict, Optional, Literal +import time from ..util.logger import get_logger from ..util.config import copy_client_configuration_properties, translate_java_to_lib @@ -24,7 +25,7 @@ def _create_consumer_kafka_python(bootstrap: str, base_cfg: Dict[str, Any]): "enable_auto_commit": False, "auto_offset_reset": "earliest", } - copy_client_configuration_properties(base_cfg, consumer_cfg) + copy_client_configuration_properties(base_cfg, consumer_cfg, "kafka-python") return kafka.KafkaConsumer(**consumer_cfg) def _create_consumer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): @@ -36,7 +37,7 @@ def _create_consumer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): "enable_auto_commit": False, "auto_offset_reset": "earliest", } - copy_client_configuration_properties(base_cfg, consumer_cfg) + copy_client_configuration_properties(base_cfg, consumer_cfg, "aiokafka") return AIOKafkaConsumer(**consumer_cfg) def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): @@ -47,8 +48,13 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "client.id": "superstreamlib-metadata-consumer", "enable.auto.commit": False, "auto.offset.reset": "earliest", + "group.id": "superstreamlib-metadata-consumer", + "session.timeout.ms": 10000, + "heartbeat.interval.ms": 3000, + "max.poll.interval.ms": 30000, + "enable.partition.eof": True, } - copy_client_configuration_properties(base_cfg, consumer_cfg) + copy_client_configuration_properties(base_cfg, consumer_cfg, "confluent") return Consumer(consumer_cfg) _CONSUMER_BUILDERS = { @@ -57,21 +63,15 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "confluent": _create_consumer_confluent, } -def fetch_metadata( +def fetch_metadata_kafka_python( bootstrap: str, cfg: Dict[str, Any], - lib_name: Literal["kafka-python", "aiokafka", "confluent"], ) -> Optional[Dict[str, Any]]: - """Fetch the latest optimization metadata from the Superstream internal topic. - - The consumer is created using the *same* Kafka library that the application - itself employs (`lib_name`). This guarantees compatibility with user - dependencies and avoids version-related conflicts. - """ - - builder = _CONSUMER_BUILDERS.get(lib_name) + """Fetch metadata using kafka-python library.""" + + builder = _CONSUMER_BUILDERS.get("kafka-python") if builder is None: - logger.error("[ERR-204] Unsupported Kafka library: {}", lib_name) + logger.error("[ERR-204] Unsupported Kafka library: kafka-python") return None topic = "superstream.metadata_v1" @@ -85,75 +85,287 @@ def fetch_metadata( consumer.close() return None - if lib_name == "confluent": - from confluent_kafka import TopicPartition # type: ignore + import kafka as _kafka # type: ignore - tp = TopicPartition(topic, 0) - consumer.assign([tp]) - low, high = consumer.get_watermark_offsets(tp, timeout=5.0) - if high == 0: - logger.error( - "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." - ) - consumer.close() - return None - consumer.seek(TopicPartition(topic, 0, high - 1)) - msg = consumer.poll(timeout=5.0) + tp = _kafka.TopicPartition(topic, 0) + consumer.assign([tp]) + + # Get the end offset safely + end_offsets = consumer.end_offsets([tp]) + end = end_offsets.get(tp, 0) + + if end == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) consumer.close() - if msg and msg.value(): - return json.loads(msg.value().decode()) + return None + + consumer.seek(tp, end - 1) + recs = consumer.poll(timeout_ms=5000) + consumer.close() + for batch in recs.values(): + for rec in batch: + return json.loads(rec.value.decode()) + except Exception as exc: + logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + return None + +def fetch_metadata_confluent( + bootstrap: str, + cfg: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Fetch metadata using confluent-kafka library.""" + + builder = _CONSUMER_BUILDERS.get("confluent") + if builder is None: + logger.error("[ERR-204] Unsupported Kafka library: confluent") + return None - else: # kafka-python / aiokafka share similar API - import kafka as _kafka # type: ignore + topic = "superstream.metadata_v1" + consumer = None + try: + consumer = builder(bootstrap, cfg) - tp = _kafka.TopicPartition(topic, 0) - consumer.assign([tp]) - consumer.seek_to_end(tp) - end = consumer.position(tp) - if end == 0: + from confluent_kafka import TopicPartition # type: ignore + + # Warm up the connection before attempting list_topics + logger.debug("Warming up connection with initial poll...") + try: + consumer.poll(timeout=2.0) + logger.debug("Connection warm-up completed") + except Exception as e: + logger.debug("Connection warm-up failed, continuing anyway: {}", e) + + # Try to get topic metadata, but don't fail if it doesn't work + try: + metadata = consumer.list_topics(topic=topic, timeout=5.0) + if topic not in metadata.topics: logger.error( - "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." ) consumer.close() return None - consumer.seek(tp, end - 1) - recs = consumer.poll(timeout_ms=5000) + except Exception as e: + logger.debug("Could not list topics, proceeding with assignment: {}", e) + # Continue with topic assignment even if list_topics fails + + # Assign to partition 0 + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + + # Poll to ensure assignment is complete and add a small delay + consumer.poll(timeout=1.0) + time.sleep(0.1) # Small delay to ensure consumer is ready + + # Get watermark offsets to find the last message + try: + low, high = consumer.get_watermark_offsets(tp, timeout=5.0) + except Exception as e: + logger.error("[ERR-203] Failed to get watermark offsets: {}", e) + consumer.close() + return None + + if high == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) + consumer.close() + return None + + # Seek to the last message (high - 1) + try: + consumer.seek(TopicPartition(topic, 0, high - 1)) + except Exception as e: + logger.error("[ERR-203] Failed to seek to last message: {}", e) consumer.close() - for batch in recs.values(): - for rec in batch: - return json.loads(rec.value.decode()) + return None + + # Poll for the message + msg = consumer.poll(timeout=5.0) + consumer.close() + + if msg and msg.value(): + return json.loads(msg.value().decode()) + else: + logger.error("[ERR-202] No message found at last offset") + return None + + except Exception as exc: + logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + if consumer: + try: + consumer.close() + except: + pass + return None + +async def fetch_metadata_aiokafka( + bootstrap: str, + cfg: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Fetch metadata using aiokafka library.""" + + builder = _CONSUMER_BUILDERS.get("aiokafka") + if builder is None: + logger.error("[ERR-204] Unsupported Kafka library: aiokafka") + return None + + topic = "superstream.metadata_v1" + consumer = None + try: + consumer = builder(bootstrap, cfg) + + # Start the consumer first + await consumer.start() + + # For aiokafka, partitions_for_topic returns a set, not a coroutine + partitions = consumer.partitions_for_topic(topic) + if not partitions: + logger.error( + "[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics." + ) + await consumer.stop() + return None + + # aiokafka uses its own TopicPartition and async API + from aiokafka import TopicPartition # type: ignore + + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + + # Get the end offset safely using aiokafka's API + end_offsets = await consumer.end_offsets([tp]) + end = end_offsets.get(tp, 0) + + if end == 0: + logger.error( + "[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty." + ) + await consumer.stop() + return None + + consumer.seek(tp, end - 1) + recs = await consumer.getmany(timeout_ms=5000) + await consumer.stop() + for batch in recs.values(): + for rec in batch: + return json.loads(rec.value.decode()) except Exception as exc: logger.error("[ERR-203] Failed to fetch metadata: {}", exc) + if consumer: + try: + await consumer.stop() + except: + pass return None -def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> Dict[str, Any]: - """Compute optimal configuration based on metadata and topics.""" +def fetch_metadata_sync( + bootstrap: str, + cfg: Dict[str, Any], + lib_name: Literal["kafka-python", "aiokafka", "confluent"], +) -> Optional[Dict[str, Any]]: + """Synchronous wrapper for fetch_metadata.""" + import asyncio + + if lib_name == "kafka-python": + return fetch_metadata_kafka_python(bootstrap, cfg) + elif lib_name == "confluent": + return fetch_metadata_confluent(bootstrap, cfg) + elif lib_name == "aiokafka": + # For aiokafka, we need to handle the async case + try: + # Try to get the current event loop + loop = asyncio.get_running_loop() + # If we get here, there's already a running event loop + # We need to create a new event loop in a separate thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, fetch_metadata_aiokafka(bootstrap, cfg)) + return future.result() + except RuntimeError: + # No event loop is running, we can use asyncio.run() + return asyncio.run(fetch_metadata_aiokafka(bootstrap, cfg)) + else: + logger.error("[ERR-204] Unsupported Kafka library: {}", lib_name) + return None + +def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]: + """Compute optimal configuration based on metadata and topics. + + Returns: + tuple: (configuration_dict, warning_message) + """ latency = os.getenv("SUPERSTREAM_LATENCY_SENSITIVE", "false").lower() == "true" cfg: Dict[str, Any] + warning_msg = "" + if not metadata or not metadata.get("topics_configuration"): logger.debug("No metadata or topics_configuration found; applying default configuration: %s", _DEFAULTS) - logger.warning("The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console") + if not topics: + warning_msg = "No SUPERSTREAM_TOPICS_LIST environment variable set. Please set it to enable topic-specific optimizations." + else: + warning_msg = "The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console" + logger.warning(warning_msg) cfg = dict(_DEFAULTS) else: matches = [tc for tc in metadata["topics_configuration"] if tc["topic_name"] in topics] if not matches: logger.debug("No matching topics found in metadata; applying default configuration: %s", _DEFAULTS) - logger.warning("The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console") + if not topics: + warning_msg = "No SUPERSTREAM_TOPICS_LIST environment variable set. Please set it to enable topic-specific optimizations." + else: + warning_msg = "The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console" + logger.warning(warning_msg) cfg = dict(_DEFAULTS) else: best = max(matches, key=lambda tc: tc["potential_reduction_percentage"] * tc["daily_writes_bytes"]) cfg = dict(best.get("optimized_configuration", {})) for k, v in _DEFAULTS.items(): cfg.setdefault(k, v) + if latency: - cfg.pop("linger.ms", None) - for p in ("batch.size", "linger.ms"): - if p in orig and p in cfg: + # For latency-sensitive applications, don't apply linger.ms optimization + # Keep the original value if it exists, otherwise use a default + if "linger.ms" in cfg: + # Get the original linger.ms value if it exists + orig_linger_key = None + if lib_name == "kafka-python" or lib_name == "aiokafka": + orig_linger_key = "linger_ms" + elif lib_name == "confluent": + orig_linger_key = "linger.ms" + + if orig_linger_key and orig_linger_key in orig: + # Use the original value instead of the optimized one + cfg["linger.ms"] = orig[orig_linger_key] + logger.debug("Using original linger.ms value ({}) for latency-sensitive application", orig[orig_linger_key]) + else: + # Remove the optimized value but it will be added back as default later + cfg.pop("linger.ms") + + # Translate Java-style keys to library-specific keys for comparison + java_keys_to_check = ["batch.size", "linger.ms"] + lib_keys_to_check = [] + for java_key in java_keys_to_check: + translated = translate_java_to_lib({java_key: ""}, lib_name) + lib_key = list(translated.keys())[0] if translated else java_key + lib_keys_to_check.append(lib_key) + + for java_key, lib_key in zip(java_keys_to_check, lib_keys_to_check): + if lib_key in orig and java_key in cfg: try: - if int(orig[p]) > int(cfg[p]): - cfg[p] = orig[p] + if int(orig[lib_key]) > int(cfg[java_key]): + logger.debug("Keeping original {} value ({}) as it's larger than optimized ({})", java_key, orig[lib_key], cfg[java_key]) + cfg[java_key] = orig[lib_key] except Exception: pass + # Ensure all core optimization parameters are present in the final config + # But don't add linger.ms back if it was removed for latency sensitivity + for k, v in _DEFAULTS.items(): + if k not in cfg: + if not (latency and k == "linger.ms"): + cfg[k] = v + # Translate Java-style keys to library-specific keys - return translate_java_to_lib(cfg, lib_name) \ No newline at end of file + return translate_java_to_lib(cfg, lib_name), warning_msg \ No newline at end of file diff --git a/superclient/agent/tracker.py b/superclient/agent/tracker.py index a94ab0f..367701c 100644 --- a/superclient/agent/tracker.py +++ b/superclient/agent/tracker.py @@ -39,6 +39,7 @@ def __init__( self.error = error self.metadata = metadata self.topics_env = topics_env or [] + self.start_time_ms = int(time.time() * 1000) # Unix timestamp in milliseconds def record_topic(self, topic: str): """Record a topic that this producer writes to.""" @@ -52,7 +53,6 @@ def determine_topic(self) -> str: """Get the most impactful topic for this producer based on metadata analysis.""" if not self.metadata or not self.metadata.get("topics_configuration"): # Fallback to first topic if no metadata available - logger.warning("No metadata available for producer {}, falling back to first used topic", self.client_id) return sorted(self.topics)[0] if self.topics else "" # Find matching topic configurations from metadata based on environment topics only @@ -63,8 +63,6 @@ def determine_topic(self) -> str: if not matches: # Fallback to first environment topic if no matches - logger.warning("No matching topics found in metadata for producer {} (env topics: {}), falling back to first environment topic", - self.client_id, self.topics_env) return sorted(self.topics_env)[0] if self.topics_env else "" # Use the same logic as optimal_cfg: find the topic with highest impact diff --git a/superclient/core/manager.py b/superclient/core/manager.py index 80c8b2c..d9f3cbf 100644 --- a/superclient/core/manager.py +++ b/superclient/core/manager.py @@ -60,7 +60,7 @@ def fetch_metadata_kafka_python(bootstrap: str, cfg: Dict[str, Any]) -> Optional "enable_auto_commit": False, "group_id": "superstream-metadata-fetcher", } - copy_client_configuration_properties(cfg, kafka_cfg) + copy_client_configuration_properties(cfg, kafka_cfg, "kafka-python") # Create consumer and fetch message consumer = KafkaConsumer(topic, **kafka_cfg) @@ -102,7 +102,7 @@ def fetch_metadata_confluent(bootstrap: str, cfg: Dict[str, Any]) -> Optional[Me "enable.auto.commit": False, "group.id": "superstream-metadata-fetcher", } - copy_client_configuration_properties(cfg, kafka_cfg) + copy_client_configuration_properties(cfg, kafka_cfg, "confluent") # Create consumer and fetch message consumer = Consumer(kafka_cfg) @@ -139,7 +139,7 @@ async def fetch_metadata_aiokafka(bootstrap: str, cfg: Dict[str, Any]) -> Option "enable_auto_commit": False, "group_id": "superstream-metadata-fetcher", } - copy_client_configuration_properties(cfg, kafka_cfg) + copy_client_configuration_properties(cfg, kafka_cfg, "aiokafka") # Create consumer and fetch message consumer = AIOKafkaConsumer(topic, **kafka_cfg) diff --git a/superclient/core/reporter.py b/superclient/core/reporter.py index 1aab492..13a1261 100644 --- a/superclient/core/reporter.py +++ b/superclient/core/reporter.py @@ -29,7 +29,7 @@ def _create_producer_kafka_python(bootstrap: str, base_cfg: Dict[str, Any]): "batch.size": 16_384, "linger.ms": 1000, } - copy_client_configuration_properties(base_cfg, cfg) + copy_client_configuration_properties(base_cfg, cfg, "kafka-python") kafka_cfg = {k.replace(".", "_"): v for k, v in cfg.items()} return kafka.KafkaProducer(**kafka_cfg) @@ -44,7 +44,7 @@ def _create_producer_confluent(bootstrap: str, base_cfg: Dict[str, Any]): "batch.size": 16384, "linger.ms": 1000, } - copy_client_configuration_properties(base_cfg, cfg) + copy_client_configuration_properties(base_cfg, cfg, "confluent") return _CProducer(cfg) @@ -55,10 +55,10 @@ async def _create_producer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]): "bootstrap_servers": bootstrap, "client_id": _SUPERLIB_PREFIX + "client-reporter", "compression_type": "zstd", - "batch_size": 16_384, + "max_batch_size": 16_384, "linger_ms": 1000, } - copy_client_configuration_properties(base_cfg, cfg) + copy_client_configuration_properties(base_cfg, cfg, "aiokafka") return AIOKafkaProducer(**cfg) @@ -78,20 +78,40 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt return try: + # Handle aiokafka (async library) if lib_name == "aiokafka": - asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload)) + # Handle the case where an event loop is already running + try: + # Try to get the current event loop + loop = asyncio.get_running_loop() + # If we get here, there's already a running event loop + # We need to create a new event loop in a separate thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, internal_send_clients_async(bootstrap, base_cfg, payload)) + future.result() + except RuntimeError: + # No event loop is running, we can use asyncio.run() + asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload)) return - prod = builder(bootstrap, base_cfg) - if hasattr(prod, "produce"): - prod.produce("superstream.clients", payload) - prod.flush() - else: + # Handle kafka-python (sync library) + if lib_name == "kafka-python": + prod = builder(bootstrap, base_cfg) prod.send("superstream.clients", payload) prod.flush() prod.close() - except Exception: - logger.debug("Failed to send clients message via {}", lib_name) + return + + # Handle confluent-kafka (sync library with different API) + if lib_name == "confluent": + prod = builder(bootstrap, base_cfg) + prod.produce("superstream.clients", payload) + prod.flush() + return + + except Exception as e: + logger.error("Failed to send clients message via {}: {}", lib_name, e) async def internal_send_clients_async(bootstrap: str, base_cfg: Dict[str, Any], payload: bytes) -> None: @@ -103,8 +123,8 @@ async def internal_send_clients_async(bootstrap: str, base_cfg: Dict[str, Any], await prod.send_and_wait("superstream.clients", payload) finally: await prod.stop() - except Exception: - logger.debug("Failed to send clients message via aiokafka") + except Exception as e: + logger.error("Failed to send clients message via aiokafka: {}", e) def send_clients_msg(tracker: Any, error: str = "") -> None: @@ -130,6 +150,10 @@ def send_clients_msg(tracker: Any, error: str = "") -> None: most_impactful_topic=tracker.determine_topic(), language=f"Python ({tracker.library})", error=error, + producer_metrics={}, + topic_metrics={}, + node_metrics={}, + app_info_metrics={"start-time-ms": str(tracker.start_time_ms)}, ) payload = json.dumps(msg.__dict__).encode() internal_send_clients(tracker.bootstrap, tracker.orig_cfg, payload, tracker.library) diff --git a/superclient/model/messages.py b/superclient/model/messages.py index 7c254eb..9483ad3 100644 --- a/superclient/model/messages.py +++ b/superclient/model/messages.py @@ -35,4 +35,8 @@ class ClientMessage: superstream_client_uid: str = "" most_impactful_topic: str = "" language: str = "" - error: str = "" \ No newline at end of file + error: str = "" + producer_metrics: Dict[str, Any] = field(default_factory=dict) + topic_metrics: Dict[str, Any] = field(default_factory=dict) + node_metrics: Dict[str, Any] = field(default_factory=dict) + app_info_metrics: Dict[str, Any] = field(default_factory=dict) \ No newline at end of file diff --git a/superclient/util/config.py b/superclient/util/config.py index 723f325..a6c4ad9 100644 --- a/superclient/util/config.py +++ b/superclient/util/config.py @@ -30,15 +30,62 @@ def get_topics_list() -> List[str]: def mask_sensitive(k: str, v: Any) -> Any: """Mask sensitive configuration values.""" - return "[MASKED]" if "password" in k.lower() or "sasl.jaas.config" in k.lower() else v + sensitive_patterns = [ + # Existing patterns + "password", "sasl.jaas.config", "basic.auth.user.info", + "ssl.key", "ssl.cert", "ssl.truststore", "ssl.keystore", + "sasl.kerberos.keytab", "sasl.kerberos.principal", + + # Additional SSL properties + "ssl.cafile", "ssl.certfile", "ssl.keyfile", + "ssl.certificate.location", "ssl.certificate.pem", + "ssl.ca.location", "ssl.ca.pem", + "ssl.ca.certificate.stores", "ssl.crl.location", + "ssl.providers", "ssl.context", + + # Additional SASL properties + "sasl.username", "sasl.password", + "sasl.plain.username", "sasl.plain.password", + "sasl.oauthbearer.config", "sasl.oauthbearer.client.secret", + "sasl.oauthbearer.extensions", + + # OAuth callback configurations + "sasl.oauth.token.provider", "oauth_cb", "sasl_oauth_token_provider", + + # Library-specific variations + "sasl_plain_username", "sasl_plain_password", + "ssl_cafile", "ssl_certfile", "ssl_keyfile", + "sasl_oauth_token_provider" + ] + return "[MASKED]" if any(pattern in k.lower() for pattern in sensitive_patterns) else v + +def _serialize_config_value(v: Any) -> Any: + """Convert configuration values to JSON-serializable format.""" + if callable(v): + # Convert functions to string representation + return f"" + elif hasattr(v, '__dict__'): + # Handle objects that might have __dict__ but aren't functions + try: + # Try to serialize as dict, fallback to string representation + return v.__dict__ + except: + return f"" + else: + return v -def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any]): +def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any], lib_name: str = "confluent"): """Copy essential client configuration properties from source to destination. This ensures internal Kafka clients have the same security, network, and connection configurations as the user's Kafka clients. Only copies properties that are explicitly set in the source configuration. + + Args: + src: Source configuration (in library-specific syntax) + dst: Destination configuration (in library-specific syntax) + lib_name: Library name for key translation """ - # List of all possible auth/network related configs + # List of all possible auth/network related configs in Java-style syntax possible_keys = [ # Security protocol "security.protocol", @@ -53,8 +100,10 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "enable.ssl.certificate.verification", "ssl.certificate.location", "ssl.certificate.pem", "ssl.ca.location", "ssl.ca.pem", + "ssl.key.location", "ssl.ca.certificate.stores", "ssl.crl.location", "ssl.providers", "ssl.context", + "ssl.cafile", "ssl.certfile", "ssl.keyfile", # SASL properties "sasl.mechanism", "sasl.mechanisms", "sasl.jaas.config", @@ -67,6 +116,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "sasl.login.refresh.min.period.seconds", "sasl.login.refresh.buffer.seconds", "sasl.login.retry.backoff.ms", "sasl.login.retry.backoff.max.ms", "sasl.username", "sasl.password", + "sasl.plain.username", "sasl.plain.password", "sasl.oauthbearer.config", "sasl.oauthbearer.client.id", "sasl.oauthbearer.client.secret", "sasl.oauthbearer.scope", "sasl.oauthbearer.extensions", "sasl.oauthbearer.token.endpoint.url", @@ -88,10 +138,27 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "retries" ] - # Only copy properties that are explicitly set in the source configuration - for k in possible_keys: - if k in src and k not in dst: - dst[k] = src[k] + # For each Java-style key, find if it exists in the source config + # by checking both the Java-style key and its library-specific equivalent + for java_key in possible_keys: + # Check if the Java-style key exists in source + if java_key in src: + if java_key not in dst: + dst[java_key] = src[java_key] + continue + + # Check if the library-specific equivalent exists in source + if lib_name in _JAVA_TO_LIB_MAPPING: + lib_key = _JAVA_TO_LIB_MAPPING[lib_name].get(java_key, java_key) + if lib_key in src and lib_key not in dst: + dst[lib_key] = src[lib_key] + + # Debug log to show config comparison + # Mask sensitive data before logging + src_masked = {k: mask_sensitive(k, v) for k, v in src.items()} + dst_masked = {k: mask_sensitive(k, v) for k, v in dst.items()} + + logger.debug("copy_client_configuration_properties - Source config: {}, Destination config: {}", src_masked, dst_masked) # --------------------------------------------------------------------------- # Field name mapping between Java-style and library-specific representations @@ -132,12 +199,16 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "metrics.num.samples": "metrics_num_samples", "metrics.sample.window.ms": "metrics_sample_window_ms", "sasl.mechanism": "sasl_mechanism", + "sasl.plain.username": "sasl_plain_username", + "sasl.plain.password": "sasl_plain_password", "sasl.kerberos.name": "sasl_kerberos_name", "sasl.kerberos.service.name": "sasl_kerberos_service_name", "sasl.kerberos.domain.name": "sasl_kerberos_domain_name", "sasl.oauth.token.provider": "sasl_oauth_token_provider", "socks5.proxy": "socks5_proxy", - "compression.codec": "compression_type", # Maps to compression_type + "ssl.cafile": "ssl_cafile", + "ssl.certfile": "ssl_certfile", + "ssl.keyfile": "ssl_keyfile", }, "aiokafka": { # Basic configuration @@ -159,7 +230,9 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "enable.idempotence": "enable_idempotence", "security.protocol": "security_protocol", "sasl.mechanism": "sasl_mechanism", - "compression.codec": "compression_type", # Maps to compression_type + "ssl.context": "ssl_context", + "sasl.plain.username": "sasl_plain_username", + "sasl.plain.password": "sasl_plain_password", }, "confluent": { # Confluent uses Java-style names directly, so most mappings are 1:1 @@ -196,6 +269,17 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any "message.send.max.retries": "message.send.max.retries", "batch.num.messages": "batch.num.messages", "compression.codec": "compression.codec", + "oauth_cb": "oauth_cb", + + # SSL properties (confluent uses Java-style names directly) + "ssl.ca.location": "ssl.ca.location", + "ssl.certificate.location": "ssl.certificate.location", + "ssl.key.location": "ssl.key.location", + "ssl.keystore.location": "ssl.keystore.location", + "ssl.keystore.password": "ssl.keystore.password", + "ssl.key.password": "ssl.key.password", + "ssl.endpoint.identification.algorithm": "ssl.endpoint.identification.algorithm", + "enable.ssl.certificate.verification": "enable.ssl.certificate.verification", } } @@ -256,7 +340,7 @@ def translate_lib_to_java(cfg: Dict[str, Any], lib_name: str) -> Dict[str, Any]: "enable_idempotence": False, "delivery_timeout_ms": 120000, "acks": 1, - "compression_type": None, + "compression_type": "none", "retries": 0, "batch_size": 16384, "linger_ms": 0, @@ -303,7 +387,7 @@ def translate_lib_to_java(cfg: Dict[str, Any], lib_name: str) -> Dict[str, Any]: # Producer specific "acks": 1, - "compression_type": None, + "compression_type": "none", "max_batch_size": 16384, # aiokafka uses max_batch_size "linger_ms": 0, "partitioner": None, @@ -422,5 +506,10 @@ def get_original_config(orig_cfg: Dict[str, Any], lib_name: str) -> Dict[str, An for k, v in defaults_java.items(): if k not in user_keys_java: merged[k] = v + + # Serialize any function objects to make them JSON-serializable + serialized: Dict[str, Any] = {} + for k, v in merged.items(): + serialized[k] = _serialize_config_value(v) - return merged \ No newline at end of file + return serialized \ No newline at end of file