SharedBroker is a high-performance Ruby library designed to simplify event-based communication (asynchronous messaging) and telemetry (observability) in Rails microservice architectures.
The library implements the Adapter Pattern to decouple your application from physical queue providers, allowing easy broker swapping and clean synchronous testing with an in-memory adapter.
- Pluggable Messaging: Adapter pattern supporting:
InMemory: Synchronous local simulation for fast TDD testing (no inline external I/O stubs required).RabbitMQ: Robust connection using thebunnygem.Kafka: High-throughput adapter using thekafkagem.Redis: Light-weight Pub/Sub broker using theredisgem.
- Resilience & Fault Tolerance:
- Automatic Retry: Automatic retry mechanism on message processing failures using exponential backoff.
- Dead Letter Queue (DLQ): Messages that exhaust their retries are automatically moved to a DLQ (
#{queue_name}.dlqor a custom topic/list depending on the adapter) containing error metadata headers. - Circuit Breaker: Integrated thread-safe Circuit Breaker wrapping message publication to prevent cascading failures.
- Security & Data Validation:
- Strict Schema Validation: Integration with
dry-schemato validate message structures on both publish (boundaries out) and subscribe (boundaries in). - Transparent Payload Encryption: Payloads are automatically encrypted at rest using AES-256-GCM via
SharedBroker.encryption_key.
- Strict Schema Validation: Integration with
- Integrated OpenTelemetry: Centralized SDK configuration with auto-instrumentation for all supported libraries (ActiveRecord, Bunny, Faraday, Rails, PG, etc.).
Add this line to your application's Gemfile:
gem "shared_broker", path: "gems/shared_broker" # for local gem
# or when published:
# gem "shared_broker"And execute:
bundle installCreate an initializer in your Rails application (config/initializers/shared_broker.rb). Below is the breakdown of what is required versus what is optional.
You must configure the adapter depending on the environment, initialize the client, and configure the payload encryption key (required since AES-256-GCM is active by default):
require "shared_broker"
# A. Configure Payload Encryption Key (AES-256-GCM)
# Expects a 32-byte string. Use a secure production key in production.
SharedBroker.encryption_key = ENV.fetch("SHARED_BROKER_ENCRYPTION_KEY") { "a" * 32 }
# B. Configure the Adapter based on Environment
if Rails.env.test?
# In-memory adapter prevents external queue dependency during unit tests
BROKER_ADAPTER = SharedBroker::Adapters::InMemory.new
else
# Connects to real RabbitMQ broker
amqp_url = ENV.fetch("RABBITMQ_URL") { "amqp://guest:guest@localhost:5672" }
BROKER_ADAPTER = SharedBroker::Adapters::RabbitMQ.new(amqp_url: amqp_url)
end
# C. Instantiate the Client by Injecting the Adapter
SPOT_BROKER = SharedBroker::Client.new(adapter: BROKER_ADAPTER)These features can be configured optionally depending on your needs.
Register schemas to validate payload structure automatically on outbound (publish) and inbound (subscribe) boundaries:
user_created_schema = Dry::Schema.Params do
required(:id).filled(:integer)
required(:email).filled(:string)
end
SharedBroker::Validation.register("user.created", user_created_schema)By default, the client instantiates a standard Circuit Breaker. You can provide a custom one to tune the failure threshold and recovery window:
custom_circuit_breaker = SharedBroker::CircuitBreaker.new(
failure_threshold: 5, # trip circuit after 5 failures
recovery_timeout: 30 # wait 30 seconds before attempting recovery
)
SPOT_BROKER = SharedBroker::Client.new(
adapter: BROKER_ADAPTER,
circuit_breaker: custom_circuit_breaker
)Initialize the OpenTelemetry SDK with auto-instrumentation for the microservice:
SharedBroker::Telemetry.configure(service_name: "my_microservice")If your system requires directing different topics to different message brokers (e.g., Kafka for telemetry, RabbitMQ for transactional events, Redis for quick cache invalidation), you can configure SharedBroker::Client with multiple adapters and a routing table:
# 1. Initialize physical adapters
rabbit_adapter = SharedBroker::Adapters::RabbitMQ.new(amqp_url: "amqp://guest:guest@localhost:5672")
kafka_adapter = SharedBroker::Adapters::Kafka.new(seed_brokers: ["localhost:9092"])
redis_adapter = SharedBroker::Adapters::Redis.new(redis_url: "redis://localhost:6379")
# 2. Instantiate the Client in Hybrid mode
HYBRID_BROKER = SharedBroker::Client.new(
adapters: {
rabbitmq: rabbit_adapter,
kafka: kafka_adapter,
redis: redis_adapter
},
routing: {
# Exact routing match
"payment.processed" => :rabbitmq,
# Wildcard routing match
"telemetry.*" => :kafka,
"cache.*" => :redis,
# Default fallback routing
"*" => :rabbitmq
}
)Send events by passing the topic name and a structured payload (must be a Hash):
event_data = {
id: 1,
email: "test@example.com"
}
# The payload will be validated against its dry-schema, encrypted, and published safely.
SPOT_BROKER.publish("user.created", event_data)To start a persistent event subscriber daemon, register a queue/group name associated with the topic. You can customize the retries and backoff rate:
SPOT_BROKER.subscribe("user.created", "my_consumption_queue", max_retries: 3, backoff_base: 2) do |payload|
puts "Decrypted event successfully validated & consumed! ID: #{payload[:id]}"
# execute your business logic here...
endTo run the unit test suite using Minitest:
bundle exec rake testThis Gem is available under the terms of the MIT License.