v0.3.0
Summary
This release introduces multiplexing to the KafkaEgress connector, allowing developers to define custom routing logic for outbound simulation data while maintaining 100% backward compatibility.
🚀 Key Features
- Custom
topic_router: You can now pass aCallabletoKafkaEgressto dynamically determine the destination Kafka topic on a per-message basis.- Multiplexing: Easily split complex data streams (e.g., routing
prediction_requestevents to a dedicated ML topic while keeping standard events on the main lifecycle topic).
- Multiplexing: Easily split complex data streams (e.g., routing
- Zero Breaking Changes: Existing deployments using the default
telemetry_topicandevent_topicrouting will continue to function exactly as before without any code updates.
Example
def custom_topic_router(data: dict) -> str:
stream_type = data.get("stream_type")
if stream_type == "telemetry":
return "sim-telemetry"
value = data.get("value", {})
if isinstance(value, dict):
event_type = value.get("event_type")
if event_type == "prediction_request":
return "mill-predictions"
elif event_type == "ground_truth":
return "mill-groundtruth"
return "mill-lifecycle"
# Pass it to the egress connector
egress = KafkaEgress(
bootstrap_servers="localhost:9092",
topic_router=custom_topic_router
)Full Changelog: v0.2.0...v0.3.0