From 100cf05bf72a7726b06139045fa0edcc24e3a173 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 14:33:56 -0400 Subject: [PATCH 01/12] feat: add nats_jetstream_consumer to provide the feature requested in issue #44 closes #44 --- .github/workflows/main.yaml | 12 +- .gitignore | 1 + Gemfile.lock | 2 +- Rakefile | 70 ++++- Readme.adoc | 48 +++- ci/nats/start.sh | 27 +- examples/jetstream_endpoint.rb | 44 +++ lib/leopard.rb | 5 + lib/leopard/message_processor.rb | 48 ++++ lib/leopard/nats_api_server.rb | 173 ++++++++---- lib/leopard/nats_jetstream_callbacks.rb | 41 +++ lib/leopard/nats_jetstream_consumer.rb | 101 +++++++ lib/leopard/nats_jetstream_endpoint.rb | 18 ++ lib/leopard/nats_request_reply_callbacks.rb | 38 +++ .../nats_jetstream_integration_test.rb | 259 ++++++++++++++++++ test/lib/nats_api_server.rb | 126 ++++++--- test/lib/nats_request_reply_callbacks_test.rb | 42 +++ 17 files changed, 944 insertions(+), 111 deletions(-) create mode 100755 examples/jetstream_endpoint.rb create mode 100644 lib/leopard/message_processor.rb create mode 100644 lib/leopard/nats_jetstream_callbacks.rb create mode 100644 lib/leopard/nats_jetstream_consumer.rb create mode 100644 lib/leopard/nats_jetstream_endpoint.rb create mode 100644 lib/leopard/nats_request_reply_callbacks.rb create mode 100644 test/integration/nats_jetstream_integration_test.rb create mode 100644 test/lib/nats_request_reply_callbacks_test.rb diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index fc8098c..23d345e 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -13,10 +13,6 @@ jobs: matrix: ruby: - '3.4.5' - services: - nats: - image: nats:latest - ports: ["4222:4222", "6222:6222", "8222:8222"] steps: - uses: actions/checkout@v4 - name: Set up Ruby @@ -26,7 +22,9 @@ jobs: bundler-cache: false - name: Install dependencies run: bundle install --jobs 4 --retry 3 - - name: Run the default task - run: bundle exec rake + - name: Run CI task + run: bundle exec rake ci env: - NATS_URI: nats://nats:4222 + NATS_URI: nats://127.0.0.1:4222 + LEOPARD_NATS_URL: nats://127.0.0.1:4222 + NATS_NAME: leopard-nats diff --git a/.gitignore b/.gitignore index cd5c3a1..219c2b6 100644 --- a/.gitignore +++ b/.gitignore @@ -54,4 +54,5 @@ Gemfile.lock .rvmrc # Used by RuboCop. Remote config files pulled in from inherit_from directive. +.rubocop_cache # .rubocop-https?--* diff --git a/Gemfile.lock b/Gemfile.lock index 84163b0..b461b07 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.2.4) + leopard (0.2.5) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) diff --git a/Rakefile b/Rakefile index 91331ca..824cf6b 100644 --- a/Rakefile +++ b/Rakefile @@ -4,6 +4,9 @@ require 'rake' require 'minitest/test_task' require 'bundler/gem_tasks' require 'rubocop/rake_task' +require 'net/http' +require 'shellwords' +require 'timeout' RuboCop::RakeTask.new @@ -14,4 +17,69 @@ Minitest::TestTask.create(:test) do |task| task.warning = true end -task default: %i[rubocop test] +QUICK_TEST_FILES = Dir['test/*/**/*.rb'].reject { |file| file.start_with?('test/integration/') }.sort.freeze + +def nats_health_uri = URI('http://127.0.0.1:8222/healthz') + +def nats_ready? + Net::HTTP.get_response(nats_health_uri).is_a?(Net::HTTPSuccess) +rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ECONNRESET + false +end + +def wait_for_nats! + Timeout.timeout(30) do + sleep 1 until nats_ready? + end +rescue Timeout::Error + raise 'Timed out waiting for NATS JetStream health endpoint on http://127.0.0.1:8222/healthz' +end + +def container_runtime + File.executable?('/usr/bin/podman') || system('command -v podman > /dev/null 2>&1', exception: false) ? 'podman' : 'docker' +end + +def run_quick_tests! + sh "ruby -w -Ilib -Itest #{QUICK_TEST_FILES.shelljoin}" +end + +namespace :nats do + desc 'Start the local NATS JetStream broker via ./ci/nats/start.sh' + task :start do + sh({ 'NATS_DETACH' => '1' }, './ci/nats/start.sh') + end + + desc 'Wait for the local NATS JetStream broker health endpoint' + task :wait do + wait_for_nats! + end + + desc 'Stop the local NATS JetStream broker container' + task :stop do + sh("#{container_runtime} rm -f #{ENV.fetch('NATS_NAME', 'leopard-nats')}", verbose: false) + rescue RuntimeError + nil + end +end + +namespace :ci do + desc 'Run RuboCop and the non-integration test suite without managing NATS' + task quick: :rubocop do + run_quick_tests! + end + + desc 'Run the full test suite against a managed local NATS JetStream broker' + task :test do + Rake::Task['nats:start'].invoke + Rake::Task['nats:wait'].invoke + Rake::Task['test'].invoke + ensure + Rake::Task['nats:stop'].reenable + Rake::Task['nats:stop'].invoke + end +end + +desc 'Run RuboCop and the full test suite against a managed local NATS JetStream broker' +task ci: %w[rubocop ci:test] + +task default: :ci diff --git a/Readme.adoc b/Readme.adoc index 69e4494..1655aed 100644 --- a/Readme.adoc +++ b/Readme.adoc @@ -12,6 +12,7 @@ minimal DSL for defining endpoints and middleware. == Features * Declarative endpoint definitions with `#endpoint`. +* Declarative JetStream pull consumers with `#jetstream_endpoint`. * Grouping of endpoints with `#group` * Simple concurrency via `#run` with a configurable number of instances. * JSON aware message wrapper that gracefully handles parse errors. @@ -90,13 +91,58 @@ end EchoService.use LoggerMiddleware ---- +== JetStream Pull Consumers + +Leopard can also bind JetStream pull consumers through the same middleware and `Dry::Monads::Result` +handler contract used by request/reply endpoints. + +[source,ruby] +---- +class EventConsumer + include Rubyists::Leopard::NatsApiServer + + jetstream_endpoint( + :events, + stream: 'EVENTS', + subject: 'events.created', + durable: 'events-created-worker', + consumer: { max_deliver: 5 }, + batch: 5, + fetch_timeout: 1, + nak_delay: 2, + ) do |msg| + Success(msg.data) + end +end +---- + +JetStream handlers receive the same `Rubyists::Leopard::MessageWrapper` as service endpoints. +Leopard will: + +* `ack` on `Success` +* `nak` on `Failure` (`nak_delay:` is optional) +* `term` on unhandled exceptions + +Each Leopard `instances:` worker creates its own pull subscription loop, so JetStream consumers +scale with the same process-local concurrency model as the rest of the framework. + == Development The project uses Minitest and RuboCop. Run tests with Rake: [source,bash] ---- -$ bundle exec rake +$ bundle exec rake ci +---- + +This task starts NATS JetStream through `./ci/nats/start.sh`, waits for broker health, +runs RuboCop and the test suite, and then stops the broker. + +If you want to run the broker yourself, the same script can still be used directly: + +[source,bash] +---- +$ ./ci/nats/start.sh ---- === Conventional Commits (semantic commit messages) diff --git a/ci/nats/start.sh b/ci/nats/start.sh index 0b237b4..a39aa59 100755 --- a/ci/nats/start.sh +++ b/ci/nats/start.sh @@ -1,6 +1,8 @@ #!/usr/bin/env bash NATS_VERSION=2 +NATS_NAME=${NATS_NAME:-leopard-nats} +NATS_DETACH=${NATS_DETACH:-0} if readlink -f . >/dev/null 2>&1 # {{{ makes readlink work on mac then @@ -29,5 +31,28 @@ else runtime=docker fi +args=( + run + --rm + --name "$NATS_NAME" + -p 4222:4222 + -p 6222:6222 + -p 8222:8222 + -v ./accounts.txt:/accounts.txt +) + +if [ "$NATS_DETACH" = "1" ] +then + args+=(-d) +else + args+=(-it) +fi + +args+=( + "nats:$NATS_VERSION" + -js + -c /accounts.txt +) + set -x -exec "$runtime" run --rm -it -p 4222:4222 -p 6222:6222 -p 8222:8222 -v ./accounts.txt:/accounts.txt nats:"$NATS_VERSION" -js -c /accounts.txt "$@" +exec "$runtime" "${args[@]}" "$@" diff --git a/examples/jetstream_endpoint.rb b/examples/jetstream_endpoint.rb new file mode 100755 index 0000000..70a725b --- /dev/null +++ b/examples/jetstream_endpoint.rb @@ -0,0 +1,44 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative '../lib/leopard/nats_api_server' + +# Example JetStream worker for async event processing. +class EventConsumer + include Rubyists::Leopard::NatsApiServer + + config.logger = SemanticLogger[:EventConsumer] + + jetstream_endpoint( + :events, + stream: 'EVENTS', + subject: 'events.created', + durable: 'events-created-worker', + consumer: { + ack_wait: 30, + max_deliver: 5, + }, + batch: 5, + fetch_timeout: 1, + nak_delay: 2, + ) do |msg| + logger.info 'Processing event', data: msg.data + Success(msg.data) + rescue StandardError => e + Failure(error: e.message, data: msg.data) + end +end + +if __FILE__ == $PROGRAM_NAME + SemanticLogger.default_level = :info + SemanticLogger.add_signal_handler + SemanticLogger.add_appender(io: $stdout, formatter: :color) + EventConsumer.run( + nats_url: 'nats://localhost:4222', + service_opts: { + name: 'example.event_consumer', + version: '1.0.0', + }, + instances: ENV.fetch('EVENT_CONSUMER_INSTANCES', '1').to_i, + ) +end diff --git a/lib/leopard.rb b/lib/leopard.rb index 777f526..d9a4785 100644 --- a/lib/leopard.rb +++ b/lib/leopard.rb @@ -18,3 +18,8 @@ module Leopard require_relative 'leopard/settings' require_relative 'leopard/version' require_relative 'leopard/errors' +require_relative 'leopard/message_processor' +require_relative 'leopard/nats_jetstream_endpoint' +require_relative 'leopard/nats_jetstream_callbacks' +require_relative 'leopard/nats_jetstream_consumer' +require_relative 'leopard/nats_request_reply_callbacks' diff --git a/lib/leopard/message_processor.rb b/lib/leopard/message_processor.rb new file mode 100644 index 0000000..6719d33 --- /dev/null +++ b/lib/leopard/message_processor.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + class MessageProcessor + def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) + @wrapper_factory = wrapper_factory + @middleware = middleware + @execute_handler = execute_handler + @logger = logger + end + + def process(raw_msg, handler, callbacks) + app(callbacks, handler).call(@wrapper_factory.call(raw_msg)) + end + + private + + def app(callbacks, handler) + @middleware.call.reverse_each.reduce(base_app(handler, callbacks)) do |current, (klass, args, blk)| + klass.new(current, *args, &blk) + end + end + + def base_app(handler, callbacks) + lambda do |wrapper| + result = @execute_handler.call(wrapper, handler) + process_result(wrapper, result, callbacks) + rescue StandardError => e + @logger.error 'Error processing message: ', e + callbacks[:on_error].call(wrapper, e) + end + end + + def process_result(wrapper, result, callbacks) + case result + in Dry::Monads::Success + callbacks[:on_success].call(wrapper, result) + in Dry::Monads::Failure + callbacks[:on_failure].call(wrapper, result) + else + @logger.error('Unexpected result: ', result:) + raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" + end + end + end + end +end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 2209ce2..06b37db 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -6,7 +6,11 @@ require 'concurrent' require_relative '../leopard' require_relative 'message_wrapper' +require_relative 'message_processor' require_relative 'metrics_server' +require_relative 'nats_jetstream_endpoint' +require_relative 'nats_jetstream_consumer' +require_relative 'nats_request_reply_callbacks' module Rubyists module Leopard @@ -16,18 +20,20 @@ module NatsApiServer def self.included(base) base.extend(ClassMethods) - base.include(InstanceMethods) + base.include(WorkerLifecycle) + base.include(MessageHandling) base.extend(Dry::Monads[:result]) base.extend(Dry::Configurable) base.setting :logger, default: Rubyists::Leopard.logger, reader: true end - Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) + Endpoint = Struct.new(:name, :subject, :queue, :group, :handler, keyword_init: true) module ClassMethods include MetricsServer def endpoints = @endpoints ||= [] + def jetstream_endpoints = @jetstream_endpoints ||= [] def groups = @groups ||= {} def middleware = @middleware ||= [] @@ -44,6 +50,23 @@ def endpoint(name, subject: nil, queue: nil, group: nil, &handler) endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) end + # Define a JetStream pull consumer endpoint. + # + # @param name [String] The name of the endpoint. + # @param stream [String] The JetStream stream name. + # @param subject [String] The JetStream subject filter. + # @param durable [String] The durable consumer name. + # @param consumer [Hash, NATS::JetStream::API::ConsumerConfig, nil] Optional consumer config. + # @param batch [Integer] Number of messages to fetch per pull request. + # @param fetch_timeout [Numeric] Maximum time to wait for fetched messages. + # @param nak_delay [Numeric, nil] Optional delayed redelivery value for `nak`. + # @param handler [Proc] The block that will handle incoming messages. + # + # @return [void] + def jetstream_endpoint(name, **options, &handler) + jetstream_endpoints << build_jetstream_endpoint(name, options, handler) + end + # Define a group for organizing endpoints. # # @param name [String] The name of the group. @@ -177,9 +200,21 @@ def wake_main_thread_and_exit! rescue StandardError exit 1 end + + def build_jetstream_endpoint(name, options, handler) + NatsJetstreamEndpoint.new( + name:, + handler:, + consumer: nil, + batch: 1, + fetch_timeout: 5, + nak_delay: nil, + **options, + ) + end end - module InstanceMethods + module WorkerLifecycle # Returns the logger configured for the NATS API server. def logger = self.class.logger @@ -193,13 +228,11 @@ def logger = self.class.logger # # @return [void] def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) - @thread = Thread.current - @client = NATS.connect nats_url - @service = @client.services.add(build_service_opts(service_opts:)) - gps = self.class.groups.dup - eps = self.class.endpoints.dup - group_map = add_groups(gps) - add_endpoints eps, group_map + initialize_worker_state + connect_client(nats_url) + initialize_service(service_opts) + add_endpoints(self.class.endpoints.dup, add_groups(self.class.groups.dup)) + start_jetstream_consumer(self.class.jetstream_endpoints.dup) end # Sets up a worker thread for the NATS API server and blocks the current thread. @@ -212,15 +245,62 @@ def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) # Stops the NATS API server worker. def stop - @service&.stop - @client&.close - @thread&.wakeup + @running = false + stop_jetstream + stop_service + wake_worker rescue ThreadError nil end private + def initialize_worker_state + @thread = Thread.current + end + + def connect_client(nats_url) + @client = NATS.connect(nats_url) + end + + def initialize_service(service_opts) + @service = @client.services.add(build_service_opts(service_opts:)) + end + + def start_jetstream_consumer(endpoints) + return if endpoints.empty? + + @jetstream_consumer = jetstream_consumer_class.new( + jetstream: @client.jetstream, + endpoints:, + logger:, + process_message: method(:process_transport_message), + thread_factory:, + ) + @jetstream_consumer.start + end + + def stop_jetstream + @jetstream_consumer&.stop + end + + def stop_service + @service&.stop + @client&.close + end + + def wake_worker + @thread&.wakeup + end + + def jetstream_consumer_class + NatsJetstreamConsumer + end + + def thread_factory + Thread + end + # Builds the service options for the NATS service. # # @param service_opts [Hash] Options for the NATS service. @@ -276,6 +356,12 @@ def add_endpoints(endpoints, group_map) build_endpoint(parent, ep) end end + end + + module MessageHandling + def logger = self.class.logger + + private # Builds an endpoint in the NATS service. # @@ -286,58 +372,29 @@ def add_endpoints(endpoints, group_map) # @return [void] def build_endpoint(parent, ept) parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| - wrapper = MessageWrapper.new(raw_msg) - dispatch_with_middleware(wrapper, ept.handler) + process_transport_message(raw_msg, ept.handler, request_reply_callbacks.callbacks) end end - # Dispatches a message through the middleware stack and handles it with the provided handler. - # - # @param wrapper [MessageWrapper] The message wrapper containing the raw message. - # @param handler [Proc] The handler to process the message. - # - # @return [void] - def dispatch_with_middleware(wrapper, handler) - app = ->(w) { handle_message(w.raw, handler) } - self.class.middleware.reverse_each do |(klass, args, blk)| - app = klass.new(app, *args, &blk) - end - app.call(wrapper) + def process_transport_message(raw_msg, handler, callbacks) + message_processor.process(raw_msg, handler, callbacks) end - # Handles a raw NATS message using the provided handler. - # - # @param raw_msg [NATS::Message] The raw NATS message to handle. - # @param handler [Proc] The handler to process the message. - # - # @return [void] - def handle_message(raw_msg, handler) - wrapper = MessageWrapper.new(raw_msg) - result = instance_exec(wrapper, &handler) - process_result(wrapper, result) - rescue StandardError => e - logger.error 'Error processing message: ', e - wrapper.respond_with_error(e) + def request_reply_callbacks + @request_reply_callbacks ||= NatsRequestReplyCallbacks.new(logger:) end - # Processes the result of the handler execution. - # - # @param wrapper [MessageWrapper] The message wrapper containing the raw message. - # @param result [Dry::Monads::Result] The result of the handler execution. - # - # @return [void] - # @raise [ResultError] If the result is not a Success or Failure monad. - def process_result(wrapper, result) - case result - in Dry::Monads::Success - wrapper.respond(result.value!) - in Dry::Monads::Failure - logger.error 'Error processing message: ', result.failure - wrapper.respond_with_error(result.failure) - else - logger.error('Unexpected result: ', result:) - raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" - end + def message_processor + @message_processor ||= MessageProcessor.new( + wrapper_factory: MessageWrapper.method(:new), + middleware: -> { self.class.middleware }, + execute_handler: method(:execute_handler), + logger:, + ) + end + + def execute_handler(wrapper, handler) + instance_exec(wrapper, &handler) end end end diff --git a/lib/leopard/nats_jetstream_callbacks.rb b/lib/leopard/nats_jetstream_callbacks.rb new file mode 100644 index 0000000..052f6e3 --- /dev/null +++ b/lib/leopard/nats_jetstream_callbacks.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + class NatsJetstreamCallbacks + def initialize(logger:) + @logger = logger + end + + def callbacks_for(endpoint) + { + on_success: method(:ack_message), + on_failure: ->(wrapper, result) { nak_message(wrapper, result, endpoint) }, + on_error: method(:term_message), + } + end + + private + + def ack_message(wrapper, _result) + wrapper.raw.ack + end + + def nak_message(wrapper, result, endpoint) + log_failure(result.failure) + return wrapper.raw.nak unless endpoint.nak_delay + + wrapper.raw.nak(delay: endpoint.nak_delay) + end + + def term_message(wrapper, error) + @logger.error 'Unhandled JetStream error: ', error + wrapper.raw.term + end + + def log_failure(failure) + @logger.error 'Error processing message: ', failure + end + end + end +end diff --git a/lib/leopard/nats_jetstream_consumer.rb b/lib/leopard/nats_jetstream_consumer.rb new file mode 100644 index 0000000..2578ef1 --- /dev/null +++ b/lib/leopard/nats_jetstream_consumer.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +require_relative 'nats_jetstream_callbacks' +require_relative 'nats_jetstream_endpoint' + +module Rubyists + module Leopard + class NatsJetstreamConsumer + attr_reader :subscriptions, :threads + + def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) + @jetstream = jetstream + @endpoints = endpoints + @logger = logger + @process_message = process_message + @callbacks = dependencies.fetch(:callback_builder, NatsJetstreamCallbacks).new(logger:) + @thread_factory = dependencies.fetch(:thread_factory, Thread) + @subscriptions = [] + @threads = [] + @running = false + end + + def start + @running = true + @endpoints.each { |endpoint| start_endpoint(endpoint) } + end + + def stop + @running = false + subscriptions.each(&:unsubscribe) + threads.each(&:join) + end + + private + + def start_endpoint(endpoint) + subscription = build_subscription(endpoint) + subscriptions << subscription + threads << @thread_factory.new { consume_endpoint(subscription, endpoint) } + end + + def build_subscription(endpoint) + ensure_consumer(endpoint) + @jetstream.pull_subscribe( + endpoint.subject, + endpoint.durable, + stream: endpoint.stream, + ) + end + + def ensure_consumer(endpoint) + @jetstream.consumer_info(endpoint.stream, endpoint.durable) + rescue NATS::JetStream::Error::NotFound + @jetstream.add_consumer(endpoint.stream, consumer_config(endpoint)) + end + + def consumer_config(endpoint) + base = { + durable_name: endpoint.durable, + filter_subject: endpoint.subject, + ack_policy: 'explicit', + } + base.merge(normalized_consumer_options(endpoint)) + end + + def normalized_consumer_options(endpoint) + return {} unless endpoint.consumer + return endpoint.consumer.to_h if endpoint.consumer.respond_to?(:to_h) + + endpoint.consumer + end + + def consume_endpoint(subscription, endpoint) + while @running + begin + consume_batch(subscription, endpoint) + rescue NATS::Timeout + next if @running + rescue StandardError => e + log_loop_error(endpoint, e) + break unless @running + end + end + end + + def consume_batch(subscription, endpoint) + fetch_messages(subscription, endpoint).each do |raw_msg| + @process_message.call(raw_msg, endpoint.handler, @callbacks.callbacks_for(endpoint)) + end + end + + def fetch_messages(subscription, endpoint) + subscription.fetch(endpoint.batch, timeout: endpoint.fetch_timeout) + end + + def log_loop_error(endpoint, error) + @logger.error "JetStream endpoint #{endpoint.name} loop error: ", error + end + end + end +end diff --git a/lib/leopard/nats_jetstream_endpoint.rb b/lib/leopard/nats_jetstream_endpoint.rb new file mode 100644 index 0000000..cbd902c --- /dev/null +++ b/lib/leopard/nats_jetstream_endpoint.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + NatsJetstreamEndpoint = Struct.new( + :name, + :stream, + :subject, + :durable, + :consumer, + :batch, + :fetch_timeout, + :nak_delay, + :handler, + keyword_init: true, + ) + end +end diff --git a/lib/leopard/nats_request_reply_callbacks.rb b/lib/leopard/nats_request_reply_callbacks.rb new file mode 100644 index 0000000..941fee5 --- /dev/null +++ b/lib/leopard/nats_request_reply_callbacks.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + class NatsRequestReplyCallbacks + def initialize(logger:) + @logger = logger + end + + def callbacks + { + on_success: method(:respond_with_success), + on_failure: method(:respond_with_failure), + on_error: method(:respond_with_error), + } + end + + private + + def respond_with_success(wrapper, result) + wrapper.respond(result.value!) + end + + def respond_with_failure(wrapper, result) + log_failure(result.failure) + wrapper.respond_with_error(result.failure) + end + + def respond_with_error(wrapper, error) + wrapper.respond_with_error(error) + end + + def log_failure(failure) + @logger.error 'Error processing message: ', failure + end + end + end +end diff --git a/test/integration/nats_jetstream_integration_test.rb b/test/integration/nats_jetstream_integration_test.rb new file mode 100644 index 0000000..4e20be5 --- /dev/null +++ b/test/integration/nats_jetstream_integration_test.rb @@ -0,0 +1,259 @@ +# frozen_string_literal: true + +require 'json' +require 'securerandom' +require 'timeout' +require_relative '../helper' +require Rubyists::Leopard.libroot / 'leopard/nats_api_server' + +module NatsJetstreamBrokerHelpers + NATS_URL = ENV.fetch('LEOPARD_NATS_URL', ENV.fetch('NATS_URI', 'nats://127.0.0.1:4222')) + WAIT_TIMEOUT = 5 + NO_REDELIVERY_TIMEOUT = 1.5 + + private + + def setup_integration_context + @workers = [] + @streams = [] + @service_classes = [] + skip 'NATS JetStream broker not available' unless jetstream_available? + + @client = NATS.connect(**nats_connect_options) + @jetstream = @client.jetstream + end + + def teardown_integration_context + @workers.reverse_each(&:stop) + @streams.reverse_each { |stream| @jetstream&.delete_stream(stream) } + @service_classes.reverse_each { |klass| remove_service_class(klass) } + @client&.close + end + + def jetstream_available? + nc = NATS.connect(**nats_connect_options) + nc.jetstream.account_info + nc.close + true + rescue StandardError + false + end + + def build_names + token = SecureRandom.hex(4) + { + stream: "EVENTS_#{token}", + subject: "events.#{token}", + durable: "events_consumer_#{token}", + service: "JetstreamService#{token}", + } + end + + def publish(subject, payload) + @jetstream.publish(subject, JSON.generate(payload)) + end + + def pop_event(queue, timeout: WAIT_TIMEOUT) + Timeout.timeout(timeout) { queue.pop } + end + + def refute_event(queue, timeout: NO_REDELIVERY_TIMEOUT) + Timeout.timeout(timeout) { queue.pop } + + flunk 'expected no additional JetStream delivery' + rescue Timeout::Error + nil + end + + def wait_for(timeout: WAIT_TIMEOUT) + Timeout.timeout(timeout) do + loop do + return if yield + + sleep 0.05 + end + end + end + + def nats_connect_options + { + uri: NATS_URL, + reconnect: false, + connect_timeout: 0.5, + max_reconnect_attempts: 0, + } + end +end + +module NatsJetstreamServiceHelpers + private + + def build_worker(names, middleware: nil, &handler) + create_stream(names) + klass = build_service_class(names, middleware:, &handler) + worker = klass.new + worker.setup_worker( + nats_url: NatsJetstreamBrokerHelpers::NATS_URL, + service_opts: { name: names[:service], version: '1.0.0' }, + ) + wait_for_consumer(names) + @workers << worker + worker + end + + def create_stream(names) + @jetstream.add_stream(name: names[:stream], subjects: [names[:subject]]) + @streams << names[:stream] + end + + def build_service_class(names, middleware: nil, &handler) + klass = Class.new do + include Rubyists::Leopard::NatsApiServer + + config.logger = SemanticLogger[:JetstreamIntegration] + end + self.class.const_set(names[:service], klass) + @service_classes << klass + klass.use(middleware) if middleware + klass.jetstream_endpoint(:events, **endpoint_options(names), &handler) + klass + end + + def endpoint_options(names) + { + stream: names[:stream], + subject: names[:subject], + durable: names[:durable], + consumer: { ack_wait: 1, max_deliver: 5 }, + batch: 1, + fetch_timeout: 0.25, + nak_delay: 1, + } + end + + def build_tracking_middleware(queue) + Class.new do + define_method(:initialize) { |app| @app = app } + define_method(:call) do |wrapper| + queue << :middleware + @app.call(wrapper) + end + end + end + + def wait_for_consumer(names) + wait_for do + @jetstream.consumer_info(names[:stream], names[:durable]) + true + rescue NATS::JetStream::Error::NotFound + false + end + end + + def remove_service_class(klass) + self.class.send(:remove_const, klass.name.split('::').last) + rescue NameError + nil + end +end + +class NatsJetstreamSuccessIntegrationTest < Minitest::Test + include NatsJetstreamBrokerHelpers + include NatsJetstreamServiceHelpers + + def setup + setup_integration_context + end + + def teardown + teardown_integration_context + end + + def test_success_acks_once_and_runs_middleware + tracker, names = build_success_flow + + publish(names[:subject], { ok: true }) + + assert_success_flow(tracker) + end + + private + + def build_success_flow + tracker = Queue.new + names = build_names + middleware = build_tracking_middleware(tracker) + build_worker(names, middleware:, &success_handler(tracker)) + [tracker, names] + end + + def success_handler(tracker) + lambda do |msg| + tracker << [:handler, msg.class.name, msg.raw.metadata.num_delivered] + Dry::Monads::Success(msg.data) + end + end + + def assert_success_flow(tracker) + assert_equal :middleware, pop_event(tracker) + assert_equal [:handler, 'Rubyists::Leopard::MessageWrapper', 1], pop_event(tracker) + refute_event(tracker) + end +end + +class NatsJetstreamRetryIntegrationTest < Minitest::Test + include NatsJetstreamBrokerHelpers + include NatsJetstreamServiceHelpers + + def setup + setup_integration_context + end + + def teardown + teardown_integration_context + end + + def test_failure_naks_and_redelivers + attempts, names = build_retry_flow + + publish(names[:subject], { ok: false }) + + assert_redelivery(attempts) + end + + def test_exception_terms_without_redelivery + attempts = Queue.new + names = build_names + build_worker(names) do |msg| + attempts << msg.raw.metadata.num_delivered + raise 'boom' + end + publish(names[:subject], { ok: false }) + + assert_equal 1, pop_event(attempts) + refute_event(attempts) + end + + private + + def build_retry_flow + attempts = Queue.new + names = build_names + build_worker(names, &retry_handler(attempts)) + [attempts, names] + end + + def retry_handler(attempts) + lambda do |msg| + attempts << msg.raw.metadata.num_delivered + return Dry::Monads::Failure('retry') if msg.raw.metadata.num_delivered == 1 + + Dry::Monads::Success(msg.data) + end + end + + def assert_redelivery(attempts) + assert_equal 1, pop_event(attempts) + assert_equal 2, pop_event(attempts) + end +end diff --git a/test/lib/nats_api_server.rb b/test/lib/nats_api_server.rb index 7d4d5a1..8328dcf 100755 --- a/test/lib/nats_api_server.rb +++ b/test/lib/nats_api_server.rb @@ -2,6 +2,7 @@ require_relative '../helper' require Rubyists::Leopard.libroot / 'leopard/nats_api_server' +require Rubyists::Leopard.libroot / 'leopard/message_processor' describe 'Rubyists::Leopard::NatsApiServer' do # rubocop:disable Metrics/BlockLength before do @@ -16,6 +17,8 @@ cm = mod::ClassMethods cm.const_set(:Success, mod::Success) unless cm.const_defined?(:Success) cm.const_set(:Failure, mod::Failure) unless cm.const_defined?(:Failure) + @logger = Object.new + @logger.define_singleton_method(:error) { |*| nil } end it 'registers an endpoint' do @@ -46,6 +49,34 @@ assert_equal blk, endpoint.handler end + it 'registers a jetstream endpoint with options' do + blk = proc {} + @klass.jetstream_endpoint( + :events, + stream: 'EVENTS', + subject: 'events.created', + durable: 'events-consumer', + consumer: { max_deliver: 5 }, + batch: 10, + fetch_timeout: 2, + nak_delay: 1, + &blk + ) + + assert_equal 1, @klass.jetstream_endpoints.length + endpoint = @klass.jetstream_endpoints.first + + assert_equal :events, endpoint.name + assert_equal 'EVENTS', endpoint.stream + assert_equal 'events.created', endpoint.subject + assert_equal 'events-consumer', endpoint.durable + assert_equal({ max_deliver: 5 }, endpoint.consumer) + assert_equal 10, endpoint.batch + assert_equal 2, endpoint.fetch_timeout + assert_equal 1, endpoint.nak_delay + assert_equal blk, endpoint.handler + end + it 'registers a group' do @klass.group :math, queue: 'math' @@ -104,75 +135,86 @@ def call(wrapper) wrapper.log << :handler Dry::Monads::Success(:ok) } - @instance.stub(:process_result, ->(_wrapper, _result) {}) do - Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @instance.send(:dispatch_with_middleware, wrapper, handler) - end - end + processor = Rubyists::Leopard::MessageProcessor.new( + wrapper_factory: ->(*) { wrapper }, + middleware: -> { @klass.middleware }, + execute_handler: ->(message, block) { block.call(message) }, + logger: @logger, + ) + processor.process(raw, handler, on_success: ->(*_) {}, on_failure: ->(*_) {}, on_error: ->(*_) {}) assert_equal %i[mw1 mw2 handler], order end - it 'handles a message and processes result' do - raw_msg = Object.new - wrapper = Object.new + it 'executes a handler and routes Success to the success callback' do result = Dry::Monads::Success(:ok) - received = nil - handler = proc { |w| - received = w - result - } - processed = nil + wrapper = Object.new + success = nil + processor = processor_for(wrapper:, result:) - # Create an instance of the class to test instance methods after middleware is added - @instance = @klass.new + processor.process(:raw, proc { |message| message }, callback_set(on_success: ->(message, value) { success = [message, value] })) - @instance.stub(:process_result, ->(w, r) { processed = [w, r] }) do - Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @instance.send(:handle_message, raw_msg, handler) - end - end - - assert_equal wrapper, received - assert_equal [wrapper, result], processed + assert_equal [wrapper, result], success end - it 'responds with error when handler raises' do - raw_msg = Object.new + it 'routes raised errors to the error callback' do err = nil wrapper = Object.new - wrapper.define_singleton_method(:respond_with_error) { |raised| err = raised } - Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @instance.send(:handle_message, raw_msg, proc { raise 'boom' }) - end + processor = Rubyists::Leopard::MessageProcessor.new( + wrapper_factory: ->(*) { wrapper }, + middleware: -> { [] }, + execute_handler: ->(*) { raise 'boom' }, + logger: @logger, + ) + processor.process(:raw, proc {}, callback_set(on_error: ->(_message, raised) { err = raised })) assert_instance_of RuntimeError, err assert_equal 'boom', err.message end - it 'responds when processing Success result' do + it 'routes Success results unchanged to the success callback' do wrapper = Minitest::Mock.new - wrapper.expect(:respond, nil, ['ok']) + on_success = Minitest::Mock.new result = Rubyists::Leopard::NatsApiServer::Success.new('ok') - @instance.send(:process_result, wrapper, result) - wrapper.verify + on_success.expect(:call, nil, [wrapper, result]) + processor_for(wrapper:, result:).process(:raw, proc { |message| message }, callback_set(on_success:, on_failure: ->(*_) {})) + on_success.verify end - it 'responds when processing Failure result' do + it 'routes Failure results unchanged to the failure callback' do wrapper = Minitest::Mock.new - wrapper.expect(:respond_with_error, nil, ['fail']) + on_failure = Minitest::Mock.new result = Rubyists::Leopard::NatsApiServer::Failure.new('fail') - @instance.send(:process_result, wrapper, result) - wrapper.verify + on_failure.expect(:call, nil, [wrapper, result]) + processor_for(wrapper:, result:).process(:raw, proc { |message| message }, callback_set(on_success: ->(*_) {}, on_failure:)) + on_failure.verify end it 'passes hash failures through unchanged' do err = { code: 422, description: 'invalid' } - wrapper = Minitest::Mock.new - wrapper.expect(:respond_with_error, nil, [err]) + wrapper = Object.new result = Rubyists::Leopard::NatsApiServer::Failure.new(err) - @instance.send(:process_result, wrapper, result) - wrapper.verify + received = nil + processor_for(wrapper:, result:).process( + :raw, + proc { |message| message }, + callback_set(on_success: ->(*_) {}, on_failure: ->(_wrapper, failure_result) { received = failure_result.failure }), + ) + + assert_equal err, received + end + + def processor_for(wrapper:, result:) + Rubyists::Leopard::MessageProcessor.new( + wrapper_factory: ->(*) { wrapper }, + middleware: -> { [] }, + execute_handler: ->(*) { result }, + logger: @logger, + ) + end + + def callback_set(on_success: ->(*_) {}, on_failure: ->(*_) {}, on_error: ->(*_) {}) + { on_success:, on_failure:, on_error: } end describe 'prometheus metrics' do # rubocop:disable Metrics/BlockLength diff --git a/test/lib/nats_request_reply_callbacks_test.rb b/test/lib/nats_request_reply_callbacks_test.rb new file mode 100644 index 0000000..1ceb72f --- /dev/null +++ b/test/lib/nats_request_reply_callbacks_test.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require_relative '../helper' +require Rubyists::Leopard.libroot / 'leopard/nats_request_reply_callbacks' +require 'dry/monads' + +class NatsRequestReplyCallbacksTest < Minitest::Test + def setup + @logger = Minitest::Mock.new + @callbacks = Rubyists::Leopard::NatsRequestReplyCallbacks.new(logger: @logger).callbacks + end + + def test_success_responds_with_value + wrapper = Minitest::Mock.new + wrapper.expect(:respond, nil, ['ok']) + + @callbacks[:on_success].call(wrapper, Dry::Monads::Result::Success.new('ok')) + + wrapper.verify + end + + def test_failure_logs_and_responds_with_error + wrapper = Minitest::Mock.new + wrapper.expect(:respond_with_error, nil, ['fail']) + @logger.expect(:error, nil, ['Error processing message: ', 'fail']) + + @callbacks[:on_failure].call(wrapper, Dry::Monads::Result::Failure.new('fail')) + + wrapper.verify + @logger.verify + end + + def test_error_responds_with_error + error = RuntimeError.new('boom') + wrapper = Minitest::Mock.new + wrapper.expect(:respond_with_error, nil, [error]) + + @callbacks[:on_error].call(wrapper, error) + + wrapper.verify + end +end From 3dc91e997997ca60bed1c71447fb38a3380c3002 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 16:05:07 -0400 Subject: [PATCH 02/12] docs: ensures 100% yard documentation --- .yardopts | 5 + Gemfile | 2 + Gemfile.lock | 4 + Rakefile | 46 +++++++- Readme.adoc | 14 +++ examples/echo_endpoint.rb | 5 + lib/leopard.rb | 11 ++ lib/leopard/errors.rb | 10 ++ lib/leopard/message_processor.rb | 31 ++++++ lib/leopard/message_wrapper.rb | 1 + lib/leopard/metrics_server.rb | 41 +++++++ lib/leopard/nats_api_server.rb | 116 ++++++++++++++++++-- lib/leopard/nats_jetstream_callbacks.rb | 29 +++++ lib/leopard/nats_jetstream_consumer.rb | 61 ++++++++++ lib/leopard/nats_jetstream_endpoint.rb | 1 + lib/leopard/nats_request_reply_callbacks.rb | 27 +++++ 16 files changed, 389 insertions(+), 15 deletions(-) create mode 100644 .yardopts diff --git a/.yardopts b/.yardopts new file mode 100644 index 0000000..e639453 --- /dev/null +++ b/.yardopts @@ -0,0 +1,5 @@ +--readme Readme.adoc +--protected +--private +--output-dir doc/yard +lib/**/*.rb diff --git a/Gemfile b/Gemfile index 2a624dc..a37644f 100644 --- a/Gemfile +++ b/Gemfile @@ -6,6 +6,7 @@ source 'https://rubygems.org' gemspec group :development, :test do + gem 'asciidoctor' gem 'minitest' gem 'minitest-global_expectations' gem 'pry' @@ -16,4 +17,5 @@ group :development, :test do gem 'rubocop-performance' gem 'rubocop-rake' gem 'simplecov' + gem 'yard' end diff --git a/Gemfile.lock b/Gemfile.lock index b461b07..50c5e53 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -11,6 +11,7 @@ PATH GEM remote: https://rubygems.org/ specs: + asciidoctor (2.0.26) ast (2.4.3) base64 (0.3.0) coderay (1.1.3) @@ -95,6 +96,7 @@ GEM unicode-emoji (~> 4.0, >= 4.0.4) unicode-emoji (4.0.4) uri (1.0.3) + yard (0.9.41) zeitwerk (2.7.3) PLATFORMS @@ -102,6 +104,7 @@ PLATFORMS x86_64-linux DEPENDENCIES + asciidoctor leopard! minitest minitest-global_expectations @@ -113,6 +116,7 @@ DEPENDENCIES rubocop-performance rubocop-rake simplecov + yard BUNDLED WITH 2.6.9 diff --git a/Rakefile b/Rakefile index 824cf6b..42f665a 100644 --- a/Rakefile +++ b/Rakefile @@ -5,10 +5,14 @@ require 'minitest/test_task' require 'bundler/gem_tasks' require 'rubocop/rake_task' require 'net/http' +require 'open3' require 'shellwords' require 'timeout' +require 'yard' +require 'yard/rake/yardoc_task' RuboCop::RakeTask.new +YARD::Rake::YardocTask.new(:yard) Minitest::TestTask.create(:test) do |task| task.libs << 'lib' @@ -19,14 +23,24 @@ end QUICK_TEST_FILES = Dir['test/*/**/*.rb'].reject { |file| file.start_with?('test/integration/') }.sort.freeze +# Returns the local NATS JetStream health endpoint used by the CI helpers. +# +# @return [URI::HTTP] The health endpoint URI. def nats_health_uri = URI('http://127.0.0.1:8222/healthz') +# Reports whether the local NATS JetStream health endpoint is currently reachable. +# +# @return [Boolean] `true` when the broker responds successfully, otherwise `false`. def nats_ready? Net::HTTP.get_response(nats_health_uri).is_a?(Net::HTTPSuccess) rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ECONNRESET false end +# Waits for the local NATS JetStream broker to report healthy. +# +# @return [void] +# @raise [RuntimeError] If the broker does not become healthy within 30 seconds. def wait_for_nats! Timeout.timeout(30) do sleep 1 until nats_ready? @@ -35,14 +49,33 @@ rescue Timeout::Error raise 'Timed out waiting for NATS JetStream health endpoint on http://127.0.0.1:8222/healthz' end +# Detects the container runtime used to manage the local NATS broker. +# +# @return [String] `podman` when available, otherwise `docker`. def container_runtime File.executable?('/usr/bin/podman') || system('command -v podman > /dev/null 2>&1', exception: false) ? 'podman' : 'docker' end +# Runs the non-integration test files directly for a fast local feedback loop. +# +# @return [void] def run_quick_tests! sh "ruby -w -Ilib -Itest #{QUICK_TEST_FILES.shelljoin}" end +# Verifies that the current YARD coverage is complete. +# +# @return [void] +# @raise [RuntimeError] If YARD reports anything less than 100% documentation coverage. +def verify_yard_coverage! + output, status = Open3.capture2e('bundle', 'exec', 'yard', 'stats', '--list-undoc') + puts output + raise 'yard stats failed' unless status.success? + return if output.include?('100.00% documented') + + raise 'YARD documentation coverage is incomplete' +end + namespace :nats do desc 'Start the local NATS JetStream broker via ./ci/nats/start.sh' task :start do @@ -63,8 +96,8 @@ namespace :nats do end namespace :ci do - desc 'Run RuboCop and the non-integration test suite without managing NATS' - task quick: :rubocop do + desc 'Run RuboCop, YARD verification, and the non-integration test suite without managing NATS' + task quick: %i[rubocop yard:verify] do run_quick_tests! end @@ -80,6 +113,13 @@ namespace :ci do end desc 'Run RuboCop and the full test suite against a managed local NATS JetStream broker' -task ci: %w[rubocop ci:test] +task ci: %w[rubocop yard:verify ci:test] + +namespace :yard do + desc 'Fail if YARD reports incomplete documentation coverage' + task :verify do + verify_yard_coverage! + end +end task default: :ci diff --git a/Readme.adoc b/Readme.adoc index 1655aed..fe4f220 100644 --- a/Readme.adoc +++ b/Readme.adoc @@ -138,6 +138,20 @@ $ bundle exec rake ci This task starts NATS JetStream through `./ci/nats/start.sh`, waits for broker health, runs RuboCop and the test suite, and then stops the broker. +API documentation can be generated with: + +[source,bash] +---- +$ bundle exec rake yard +---- + +Documentation coverage is enforced with: + +[source,bash] +---- +$ bundle exec rake yard:verify +---- + If you want to run the broker yourself, the same script can still be used directly: [source,bash] diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 261b0b3..7cfd86a 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -8,6 +8,11 @@ class EchoService include Rubyists::Leopard::NatsApiServer config.logger = SemanticLogger[:EchoService] + + # Builds the example service instance. + # + # @param a_var [Integer] Example initializer state passed through `instance_args`. + # @return [void] def initialize(a_var: 1) logger.info "EchoService initialized with a_var: #{a_var}" end diff --git a/lib/leopard.rb b/lib/leopard.rb index d9a4785..1e328dc 100644 --- a/lib/leopard.rb +++ b/lib/leopard.rb @@ -4,13 +4,24 @@ require 'pathname' require 'semantic_logger' +## +# Namespace for Leopard and related helper extensions. class Pathname + # Joins the receiver with another path fragment. + # + # @param other [#to_s] The path fragment to append. + # + # @return [Pathname] The combined path. def /(other) join other.to_s end end +## +# Top-level namespace for Rubyists gems. module Rubyists + ## + # Namespace for Leopard runtime, DSL, and support classes. module Leopard end end diff --git a/lib/leopard/errors.rb b/lib/leopard/errors.rb index 985a373..706d0a6 100644 --- a/lib/leopard/errors.rb +++ b/lib/leopard/errors.rb @@ -2,12 +2,19 @@ module Rubyists module Leopard + # Base Leopard exception that truncates backtraces for cleaner logs. class LeopardError < StandardError + # Captures the original exception state while replacing the backtrace with the current call stack. + # + # @return [void] def initialize(...) super set_backtrace(caller) end + # Returns a Leopard-truncated backtrace. + # + # @return [Array] Up to the first four backtrace entries, plus a truncation marker when applicable. def backtrace # If the backtrace is nil, return an empty array orig = (super || [])[0..3] @@ -19,8 +26,11 @@ def backtrace end end + # Generic Leopard error superclass. class Error < LeopardError; end + # Raised when Leopard configuration is invalid. class ConfigurationError < Error; end + # Raised when a handler returns an unsupported object instead of a result monad. class ResultError < Error; end end end diff --git a/lib/leopard/message_processor.rb b/lib/leopard/message_processor.rb index 6719d33..8bd7bda 100644 --- a/lib/leopard/message_processor.rb +++ b/lib/leopard/message_processor.rb @@ -2,7 +2,15 @@ module Rubyists module Leopard + # Composes middleware around Leopard handlers and routes their results to transport callbacks. class MessageProcessor + # Builds a reusable processor for request/reply and JetStream transports. + # + # @param wrapper_factory [#call] Callable that wraps a raw transport message in a {MessageWrapper}-compatible object. + # @param middleware [#call] Callable returning the current middleware stack. + # @param execute_handler [#call] Callable that executes the endpoint handler with the wrapped message. + # @param logger [#error] Logger used for processing failures. + # @return [void] def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) @wrapper_factory = wrapper_factory @middleware = middleware @@ -10,18 +18,34 @@ def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) @logger = logger end + # Processes a raw transport message through middleware and terminal callbacks. + # + # @param raw_msg [Object] The raw transport message from NATS. + # @param handler [Proc] The endpoint handler to execute. + # @param callbacks [Hash{Symbol => #call}] Success, failure, and error callbacks for the transport. + # @return [Object] The transport-specific callback result. def process(raw_msg, handler, callbacks) app(callbacks, handler).call(@wrapper_factory.call(raw_msg)) end private + # Builds the middleware stack around the terminal application. + # + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # @param handler [Proc] The endpoint handler to execute at the core of the stack. + # @return [#call] The composed middleware application. def app(callbacks, handler) @middleware.call.reverse_each.reduce(base_app(handler, callbacks)) do |current, (klass, args, blk)| klass.new(current, *args, &blk) end end + # Builds the terminal application that runs the handler and dispatches transport callbacks. + # + # @param handler [Proc] The endpoint handler to execute. + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # @return [Proc] The terminal application for the middleware chain. def base_app(handler, callbacks) lambda do |wrapper| result = @execute_handler.call(wrapper, handler) @@ -32,6 +56,13 @@ def base_app(handler, callbacks) end end + # Routes a {Dry::Monads::Result} to the appropriate transport callback. + # + # @param wrapper [MessageWrapper] The wrapped transport message. + # @param result [Dry::Monads::Result] The handler result to route. + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # @return [Object] The callback return value for the routed result. + # @raise [ResultError] If the handler returned a non-result object. def process_result(wrapper, result, callbacks) case result in Dry::Monads::Success diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index 793f5a9..0b40f73 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -4,6 +4,7 @@ module Rubyists module Leopard + # Wraps a raw NATS message with parsed payload and convenience response helpers. class MessageWrapper # @!attribute [r] raw # @return [NATS::Message] The original NATS message. diff --git a/lib/leopard/metrics_server.rb b/lib/leopard/metrics_server.rb index 4c30b54..87b3b82 100644 --- a/lib/leopard/metrics_server.rb +++ b/lib/leopard/metrics_server.rb @@ -5,9 +5,14 @@ module Rubyists module Leopard + # Adds a minimal Prometheus HTTP endpoint for Leopard worker metrics. module MetricsServer private + # Starts a lightweight HTTP server that exposes Leopard Prometheus metrics. + # + # @param workers [Array] Active Leopard worker instances to observe. + # @return [Thread] The server thread. def start_metrics_server(workers) port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i Thread.new do @@ -19,6 +24,11 @@ def start_metrics_server(workers) end end + # Handles an individual metrics HTTP client connection. + # + # @param client [TCPSocket] The connected HTTP client. + # @param workers [Array] Active Leopard worker instances to observe. + # @return [void] def handle_metrics_client(client, workers) request_line = client.gets loop { break if (client.gets || '').chomp.empty? } @@ -29,12 +39,22 @@ def handle_metrics_client(client, workers) close_client(client) end + # Closes a metrics client socket, ignoring cleanup failures. + # + # @param client [TCPSocket] The connected HTTP client. + # @return [void] def close_client(client) client.close rescue StandardError nil end + # Writes the HTTP response for a metrics request. + # + # @param client [TCPSocket] The connected HTTP client. + # @param request_line [String, nil] The first line of the HTTP request. + # @param workers [Array] Active Leopard worker instances to observe. + # @return [void] def write_metrics_response(client, request_line, workers) if request_line&.start_with?('GET /metrics') body = prometheus_metrics(workers) @@ -46,11 +66,19 @@ def write_metrics_response(client, request_line, workers) end end + # Builds the Prometheus metrics payload for the current worker state. + # + # @param workers [Array] Active Leopard worker instances to observe. + # @return [String] Rendered Prometheus text exposition output. def prometheus_metrics(workers) metrics = collect_prometheus_metrics(workers) render_metrics_template(metrics) end + # Aggregates per-subject worker utilization metrics. + # + # @param workers [Array] Active Leopard worker instances to observe. + # @return [Hash{Symbol => Object}] Metric hashes for the Prometheus template. def collect_prometheus_metrics(workers) busy = Hash.new(0) pending = Hash.new(0) @@ -63,6 +91,12 @@ def collect_prometheus_metrics(workers) } end + # Adds one worker's endpoint saturation metrics to the aggregate hashes. + # + # @param worker [Object] A Leopard worker instance. + # @param busy [Hash{String => Integer}] Subject-to-busy-worker counts. + # @param pending [Hash{String => Integer}] Subject-to-pending-message counts. + # @return [void] def accumulate_worker_metrics(worker, busy, pending) service = worker.instance_variable_get(:@service) return unless service @@ -78,10 +112,17 @@ def accumulate_worker_metrics(worker, busy, pending) end end + # Renders the metrics ERB template with aggregated metric data. + # + # @param metrics [Hash{Symbol => Object}] Aggregated metric data for template rendering. + # @return [String] The rendered Prometheus payload. def render_metrics_template(metrics) ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics) end + # Returns the absolute path to the Prometheus metrics template. + # + # @return [String] The metrics template path. def metrics_template_path File.expand_path('templates/prometheus_metrics.erb', __dir__) end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 06b37db..4bec29b 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -14,10 +14,15 @@ module Rubyists module Leopard + # DSL and runtime integration for Leopard request/reply and JetStream workers. module NatsApiServer include Dry::Monads[:result] extend Dry::Monads[:result] + # Extends an including class with Leopard's DSL and worker lifecycle behavior. + # + # @param base [Class] The class including this module. + # @return [void] def self.included(base) base.extend(ClassMethods) base.include(WorkerLifecycle) @@ -27,14 +32,28 @@ def self.included(base) base.setting :logger, default: Rubyists::Leopard.logger, reader: true end + # Configuration for a request/reply endpoint declared with {.endpoint}. Endpoint = Struct.new(:name, :subject, :queue, :group, :handler, keyword_init: true) + # Class-level DSL for defining Leopard endpoints, middleware, and worker startup. module ClassMethods include MetricsServer + # Returns the configured request/reply endpoints for the service class. + # + # @return [Array] Declared request/reply endpoints. def endpoints = @endpoints ||= [] + # Returns the configured JetStream endpoints for the service class. + # + # @return [Array] Declared JetStream pull-consumer endpoints. def jetstream_endpoints = @jetstream_endpoints ||= [] + # Returns the configured endpoint groups for the service class. + # + # @return [Hash{Symbol,String => Hash}] Declared group definitions. def groups = @groups ||= {} + # Returns the configured middleware stack for the service class. + # + # @return [Array] Middleware declarations in registration order. def middleware = @middleware ||= [] # Define an endpoint for the NATS API server. @@ -44,6 +63,9 @@ def middleware = @middleware ||= [] # @param queue [String, nil] The NATS queue group to use. Defaults to nil. # @param group [String, nil] The group this endpoint belongs to. Defaults to nil. # @param handler [Proc] The block that will handle incoming messages. + # @yield [wrapper] Handles the wrapped request message. + # @yieldparam wrapper [MessageWrapper] The wrapped inbound NATS message. + # @yieldreturn [Dry::Monads::Result] The handler result. # # @return [void] def endpoint(name, subject: nil, queue: nil, group: nil, &handler) @@ -53,14 +75,18 @@ def endpoint(name, subject: nil, queue: nil, group: nil, &handler) # Define a JetStream pull consumer endpoint. # # @param name [String] The name of the endpoint. - # @param stream [String] The JetStream stream name. - # @param subject [String] The JetStream subject filter. - # @param durable [String] The durable consumer name. - # @param consumer [Hash, NATS::JetStream::API::ConsumerConfig, nil] Optional consumer config. - # @param batch [Integer] Number of messages to fetch per pull request. - # @param fetch_timeout [Numeric] Maximum time to wait for fetched messages. - # @param nak_delay [Numeric, nil] Optional delayed redelivery value for `nak`. + # @param options [Hash] JetStream endpoint configuration. + # @option options [String] :stream The JetStream stream name. + # @option options [String] :subject The JetStream subject filter. + # @option options [String] :durable The durable consumer name. + # @option options [Hash, NATS::JetStream::API::ConsumerConfig, nil] :consumer Optional consumer config. + # @option options [Integer] :batch (1) Number of messages to fetch per pull request. + # @option options [Numeric] :fetch_timeout (5) Maximum time to wait for fetched messages. + # @option options [Numeric, nil] :nak_delay Optional delayed redelivery value for `nak`. # @param handler [Proc] The block that will handle incoming messages. + # @yield [wrapper] Handles the wrapped JetStream message. + # @yieldparam wrapper [MessageWrapper] The wrapped inbound JetStream message. + # @yieldreturn [Dry::Monads::Result] The handler result. # # @return [void] def jetstream_endpoint(name, **options, &handler) @@ -97,7 +123,7 @@ def use(klass, *args, &block) # @param instances [Integer] The number of instances to spawn. Defaults to 1. # @param blocking [Boolean] If false, does not block current thread after starting the server. Defaults to true. # - # @return [void] + # @return [Concurrent::FixedThreadPool, void] The worker pool for non-blocking runs, otherwise blocks forever. def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' workers = Concurrent::Array.new @@ -121,6 +147,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param blocking [Boolean] If false, does not block current thread after starting the server. # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # @raise [ArgumentError] If `instance_args` was provided but is not a hash. def spawn_instances(url, opts, count, workers, blocking) pool = Concurrent::FixedThreadPool.new(count) @instance_args = opts.delete(:instance_args) || nil @@ -201,6 +228,12 @@ def wake_main_thread_and_exit! exit 1 end + # Builds a JetStream endpoint struct with Leopard defaults applied. + # + # @param name [String, Symbol] Endpoint name. + # @param options [Hash] JetStream endpoint options. + # @param handler [Proc] Endpoint handler block. + # @return [NatsJetstreamEndpoint] The configured JetStream endpoint definition. def build_jetstream_endpoint(name, options, handler) NatsJetstreamEndpoint.new( name:, @@ -214,17 +247,18 @@ def build_jetstream_endpoint(name, options, handler) end end + # Instance-side worker boot and shutdown helpers. module WorkerLifecycle # Returns the logger configured for the NATS API server. + # + # @return [Object] The configured logger. def logger = self.class.logger # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # - # @param url [String] The URL of the NATS server. - # @param opts [Hash] Options for the NATS service. - # @param eps [Array] The list of endpoints to add. - # @param gps [Hash] The groups to add. + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. # # @return [void] def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) @@ -238,12 +272,17 @@ def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) # Sets up a worker thread for the NATS API server and blocks the current thread. # # @see #setup_worker + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. + # @return [void] def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) setup_worker(nats_url:, service_opts:) sleep end # Stops the NATS API server worker. + # + # @return [void] def stop @running = false stop_jetstream @@ -255,18 +294,33 @@ def stop private + # Captures the current thread for later wakeup during shutdown. + # + # @return [Thread] The current worker thread. def initialize_worker_state @thread = Thread.current end + # Opens the NATS client connection for this worker. + # + # @param nats_url [String] The URL of the NATS server. + # @return [Object] The connected NATS client. def connect_client(nats_url) @client = NATS.connect(nats_url) end + # Registers the NATS service for this worker. + # + # @param service_opts [Hash] Options for the NATS service. + # @return [Object] The created NATS service. def initialize_service(service_opts) @service = @client.services.add(build_service_opts(service_opts:)) end + # Starts the JetStream consumer coordinator when JetStream endpoints are present. + # + # @param endpoints [Array] JetStream endpoints for this worker. + # @return [void] def start_jetstream_consumer(endpoints) return if endpoints.empty? @@ -280,23 +334,38 @@ def start_jetstream_consumer(endpoints) @jetstream_consumer.start end + # Stops the JetStream consumer coordinator if one was started. + # + # @return [void] def stop_jetstream @jetstream_consumer&.stop end + # Stops the registered NATS service and closes the client connection. + # + # @return [void] def stop_service @service&.stop @client&.close end + # Wakes the worker thread if it is blocked. + # + # @return [Thread, nil] The awakened worker thread, if present. def wake_worker @thread&.wakeup end + # Returns the JetStream consumer coordinator class for this worker. + # + # @return [Class] The JetStream consumer implementation class. def jetstream_consumer_class NatsJetstreamConsumer end + # Returns the thread factory used for JetStream consumer loops. + # + # @return [Class] The thread factory class. def thread_factory Thread end @@ -331,6 +400,7 @@ def add_groups(gps) # @param name [String] The name of the group to build. # # @return [NATS::Group] The created group object. + # @raise [ArgumentError] If the requested group was never defined. def build_group(defs, cache, name) return cache[name] if cache.key?(name) @@ -347,6 +417,7 @@ def build_group(defs, cache, name) # @param group_map [Hash] A map of group names to their created group objects. # # @return [void] + # @raise [ArgumentError] If an endpoint references an undefined group. def add_endpoints(endpoints, group_map) endpoints.each do |ep| grp = ep.group @@ -358,7 +429,11 @@ def add_endpoints(endpoints, group_map) end end + # Message execution helpers shared by request/reply and JetStream transports. module MessageHandling + # Returns the logger configured for the NATS API server. + # + # @return [Object] The configured logger. def logger = self.class.logger private @@ -376,14 +451,26 @@ def build_endpoint(parent, ept) end end + # Processes a raw transport message through Leopard's middleware and callback pipeline. + # + # @param raw_msg [Object] The raw NATS transport message. + # @param handler [Proc] The endpoint handler block. + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # @return [Object] The transport-specific callback result. def process_transport_message(raw_msg, handler, callbacks) message_processor.process(raw_msg, handler, callbacks) end + # Returns the callback helper for request/reply endpoints. + # + # @return [NatsRequestReplyCallbacks] The request/reply callback helper. def request_reply_callbacks @request_reply_callbacks ||= NatsRequestReplyCallbacks.new(logger:) end + # Returns the memoized message processor for this worker instance. + # + # @return [MessageProcessor] The shared message processor. def message_processor @message_processor ||= MessageProcessor.new( wrapper_factory: MessageWrapper.method(:new), @@ -393,6 +480,11 @@ def message_processor ) end + # Executes an endpoint handler within the worker instance context. + # + # @param wrapper [MessageWrapper] The wrapped transport message. + # @param handler [Proc] The endpoint handler block. + # @return [Dry::Monads::Result] The handler result. def execute_handler(wrapper, handler) instance_exec(wrapper, &handler) end diff --git a/lib/leopard/nats_jetstream_callbacks.rb b/lib/leopard/nats_jetstream_callbacks.rb index 052f6e3..0ec55bd 100644 --- a/lib/leopard/nats_jetstream_callbacks.rb +++ b/lib/leopard/nats_jetstream_callbacks.rb @@ -2,11 +2,20 @@ module Rubyists module Leopard + # Maps Leopard handler outcomes to JetStream ack, nak, and term operations. class NatsJetstreamCallbacks + # Builds a callback set for JetStream message outcomes. + # + # @param logger [#error] Logger used for failures and unhandled exceptions. + # @return [void] def initialize(logger:) @logger = logger end + # Returns transport callbacks for a JetStream endpoint. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. def callbacks_for(endpoint) { on_success: method(:ack_message), @@ -17,10 +26,21 @@ def callbacks_for(endpoint) private + # Acknowledges a successfully processed JetStream message. + # + # @param wrapper [MessageWrapper] Wrapped JetStream message. + # @param _result [Dry::Monads::Success] Successful handler result. + # @return [void] def ack_message(wrapper, _result) wrapper.raw.ack end + # Negatively acknowledges a failed JetStream message, optionally delaying redelivery. + # + # @param wrapper [MessageWrapper] Wrapped JetStream message. + # @param result [Dry::Monads::Failure] Failed handler result. + # @param endpoint [NatsJetstreamEndpoint] Endpoint configuration for the message. + # @return [void] def nak_message(wrapper, result, endpoint) log_failure(result.failure) return wrapper.raw.nak unless endpoint.nak_delay @@ -28,11 +48,20 @@ def nak_message(wrapper, result, endpoint) wrapper.raw.nak(delay: endpoint.nak_delay) end + # Terminates a JetStream message after an unhandled exception. + # + # @param wrapper [MessageWrapper] Wrapped JetStream message. + # @param error [StandardError] The unhandled exception. + # @return [void] def term_message(wrapper, error) @logger.error 'Unhandled JetStream error: ', error wrapper.raw.term end + # Logs the failure payload returned by a handler. + # + # @param failure [Object] The failure payload from the handler. + # @return [void] def log_failure(failure) @logger.error 'Error processing message: ', failure end diff --git a/lib/leopard/nats_jetstream_consumer.rb b/lib/leopard/nats_jetstream_consumer.rb index 2578ef1..53246fb 100644 --- a/lib/leopard/nats_jetstream_consumer.rb +++ b/lib/leopard/nats_jetstream_consumer.rb @@ -5,9 +5,24 @@ module Rubyists module Leopard + # Coordinates JetStream pull subscriptions and dispatches fetched messages through Leopard. class NatsJetstreamConsumer + # @!attribute [r] subscriptions + # @return [Array] Active JetStream pull subscriptions. + # @!attribute [r] threads + # @return [Array] Consumer loop threads for each endpoint. attr_reader :subscriptions, :threads + # Builds a pull-consumer coordinator for one Leopard worker. + # + # @param jetstream [Object] JetStream client used to manage consumers and subscriptions. + # @param endpoints [Array] JetStream endpoint definitions for this worker. + # @param logger [#error] Logger used for loop failures. + # @param process_message [#call] Callable that processes a raw JetStream message through Leopard. + # @param dependencies [Hash{Symbol => Object}] Optional collaborators for callback and thread creation. + # @option dependencies [Class] :callback_builder (NatsJetstreamCallbacks) Builder for transport callbacks. + # @option dependencies [Class] :thread_factory (Thread) Thread-like factory used to spawn consumer loops. + # @return [void] def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) @jetstream = jetstream @endpoints = endpoints @@ -20,11 +35,17 @@ def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies @running = false end + # Starts one pull-consumer loop per configured endpoint. + # + # @return [void] def start @running = true @endpoints.each { |endpoint| start_endpoint(endpoint) } end + # Stops all pull-consumer loops and waits for them to exit. + # + # @return [void] def stop @running = false subscriptions.each(&:unsubscribe) @@ -33,12 +54,20 @@ def stop private + # Starts a consumer loop for one endpoint. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to consume. + # @return [void] def start_endpoint(endpoint) subscription = build_subscription(endpoint) subscriptions << subscription threads << @thread_factory.new { consume_endpoint(subscription, endpoint) } end + # Ensures the durable consumer exists and creates a pull subscription for it. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to subscribe to. + # @return [Object] The JetStream pull subscription. def build_subscription(endpoint) ensure_consumer(endpoint) @jetstream.pull_subscribe( @@ -48,12 +77,20 @@ def build_subscription(endpoint) ) end + # Verifies that the durable consumer exists, creating it when missing. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to ensure. + # @return [Object] Consumer metadata from `consumer_info` or `add_consumer`. def ensure_consumer(endpoint) @jetstream.consumer_info(endpoint.stream, endpoint.durable) rescue NATS::JetStream::Error::NotFound @jetstream.add_consumer(endpoint.stream, consumer_config(endpoint)) end + # Builds the JetStream consumer configuration for an endpoint. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to translate. + # @return [Hash] Consumer configuration accepted by `add_consumer`. def consumer_config(endpoint) base = { durable_name: endpoint.durable, @@ -63,6 +100,10 @@ def consumer_config(endpoint) base.merge(normalized_consumer_options(endpoint)) end + # Normalizes optional consumer overrides into a hash. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to inspect. + # @return [Hash] Consumer overrides, or an empty hash when none were provided. def normalized_consumer_options(endpoint) return {} unless endpoint.consumer return endpoint.consumer.to_h if endpoint.consumer.respond_to?(:to_h) @@ -70,6 +111,11 @@ def normalized_consumer_options(endpoint) endpoint.consumer end + # Repeatedly fetches and processes batches for one endpoint while the consumer is running. + # + # @param subscription [Object] Pull subscription for the endpoint. + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # @return [void] def consume_endpoint(subscription, endpoint) while @running begin @@ -83,16 +129,31 @@ def consume_endpoint(subscription, endpoint) end end + # Fetches one batch from JetStream and processes each message through Leopard. + # + # @param subscription [Object] Pull subscription for the endpoint. + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # @return [void] def consume_batch(subscription, endpoint) fetch_messages(subscription, endpoint).each do |raw_msg| @process_message.call(raw_msg, endpoint.handler, @callbacks.callbacks_for(endpoint)) end end + # Fetches a batch of messages for one endpoint. + # + # @param subscription [Object] Pull subscription for the endpoint. + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # @return [Array] Raw JetStream messages returned by the subscription. def fetch_messages(subscription, endpoint) subscription.fetch(endpoint.batch, timeout: endpoint.fetch_timeout) end + # Logs an endpoint-level loop failure. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint whose loop failed. + # @param error [StandardError] The raised exception. + # @return [void] def log_loop_error(endpoint, error) @logger.error "JetStream endpoint #{endpoint.name} loop error: ", error end diff --git a/lib/leopard/nats_jetstream_endpoint.rb b/lib/leopard/nats_jetstream_endpoint.rb index cbd902c..a26f58d 100644 --- a/lib/leopard/nats_jetstream_endpoint.rb +++ b/lib/leopard/nats_jetstream_endpoint.rb @@ -2,6 +2,7 @@ module Rubyists module Leopard + # Configuration for a Leopard JetStream pull-consumer endpoint. NatsJetstreamEndpoint = Struct.new( :name, :stream, diff --git a/lib/leopard/nats_request_reply_callbacks.rb b/lib/leopard/nats_request_reply_callbacks.rb index 941fee5..f4502df 100644 --- a/lib/leopard/nats_request_reply_callbacks.rb +++ b/lib/leopard/nats_request_reply_callbacks.rb @@ -2,11 +2,19 @@ module Rubyists module Leopard + # Maps Leopard handler outcomes to request/reply response behavior. class NatsRequestReplyCallbacks + # Builds a callback set for request/reply endpoint outcomes. + # + # @param logger [#error] Logger used for failure payloads. + # @return [void] def initialize(logger:) @logger = logger end + # Returns transport callbacks for request/reply endpoints. + # + # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. def callbacks { on_success: method(:respond_with_success), @@ -17,19 +25,38 @@ def callbacks private + # Responds to a successful request with the handler payload. + # + # @param wrapper [MessageWrapper] Wrapped request message. + # @param result [Dry::Monads::Success] Successful handler result. + # @return [void] def respond_with_success(wrapper, result) wrapper.respond(result.value!) end + # Responds to a failed request with the failure payload. + # + # @param wrapper [MessageWrapper] Wrapped request message. + # @param result [Dry::Monads::Failure] Failed handler result. + # @return [void] def respond_with_failure(wrapper, result) log_failure(result.failure) wrapper.respond_with_error(result.failure) end + # Responds to a request with an exception payload after an unhandled error. + # + # @param wrapper [MessageWrapper] Wrapped request message. + # @param error [StandardError] The unhandled exception. + # @return [void] def respond_with_error(wrapper, error) wrapper.respond_with_error(error) end + # Logs the failure payload returned by a handler. + # + # @param failure [Object] The failure payload from the handler. + # @return [void] def log_failure(failure) @logger.error 'Error processing message: ', failure end From 1a387de7f4fe681ec8382bd855149996e4bccdc9 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 16:13:13 -0400 Subject: [PATCH 03/12] fix: disallows consumer config from overriding important keys --- lib/leopard/nats_jetstream_consumer.rb | 13 +++- test/lib/nats_jetstream_consumer_test.rb | 83 ++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 test/lib/nats_jetstream_consumer_test.rb diff --git a/lib/leopard/nats_jetstream_consumer.rb b/lib/leopard/nats_jetstream_consumer.rb index 53246fb..fde6bd4 100644 --- a/lib/leopard/nats_jetstream_consumer.rb +++ b/lib/leopard/nats_jetstream_consumer.rb @@ -7,6 +7,9 @@ module Rubyists module Leopard # Coordinates JetStream pull subscriptions and dispatches fetched messages through Leopard. class NatsJetstreamConsumer + # Consumer configuration keys Leopard owns and will not allow endpoint overrides to replace. + PROTECTED_CONSUMER_KEYS = %i[durable_name filter_subject ack_policy].freeze + # @!attribute [r] subscriptions # @return [Array] Active JetStream pull subscriptions. # @!attribute [r] threads @@ -97,7 +100,7 @@ def consumer_config(endpoint) filter_subject: endpoint.subject, ack_policy: 'explicit', } - base.merge(normalized_consumer_options(endpoint)) + base.merge(safe_consumer_options(endpoint)) end # Normalizes optional consumer overrides into a hash. @@ -111,6 +114,14 @@ def normalized_consumer_options(endpoint) endpoint.consumer end + # Removes Leopard-managed consumer keys from user overrides. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to inspect. + # @return [Hash] Consumer overrides excluding protected keys required by Leopard. + def safe_consumer_options(endpoint) + normalized_consumer_options(endpoint).reject { |key, _value| PROTECTED_CONSUMER_KEYS.include?(key.to_sym) } + end + # Repeatedly fetches and processes batches for one endpoint while the consumer is running. # # @param subscription [Object] Pull subscription for the endpoint. diff --git a/test/lib/nats_jetstream_consumer_test.rb b/test/lib/nats_jetstream_consumer_test.rb new file mode 100644 index 0000000..f31d693 --- /dev/null +++ b/test/lib/nats_jetstream_consumer_test.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require_relative '../helper' +require Rubyists::Leopard.libroot / 'leopard/nats_jetstream_consumer' +require Rubyists::Leopard.libroot / 'leopard/nats_jetstream_endpoint' + +class NatsJetstreamConsumerTest < Minitest::Test + def setup + @consumer = Rubyists::Leopard::NatsJetstreamConsumer.new( + jetstream: Object.new, + endpoints: [], + logger: Object.new, + process_message: ->(*_) {}, + ) + end + + def test_consumer_config_preserves_durable_name + assert_equal 'events-consumer', symbol_key_config[:durable_name] + end + + def test_consumer_config_preserves_filter_subject + assert_equal 'events.created', symbol_key_config[:filter_subject] + end + + def test_consumer_config_preserves_explicit_ack_policy + assert_equal 'explicit', symbol_key_config[:ack_policy] + end + + def test_consumer_config_keeps_safe_symbol_key_overrides + assert_equal 30, symbol_key_config[:ack_wait] + assert_equal 5, symbol_key_config[:max_deliver] + end + + def test_consumer_config_strips_protected_string_keys + refute_includes string_key_config.keys, 'durable_name' + refute_includes string_key_config.keys, 'filter_subject' + refute_includes string_key_config.keys, 'ack_policy' + end + + def test_consumer_config_keeps_safe_string_key_overrides + assert_equal 100, string_key_config['max_ack_pending'] + end + + private + + def symbol_key_config + @symbol_key_config ||= @consumer.send(:consumer_config, endpoint_with_consumer(symbol_key_overrides)) + end + + def string_key_config + @string_key_config ||= @consumer.send(:consumer_config, endpoint_with_consumer(string_key_overrides)) + end + + def endpoint_with_consumer(consumer) + Rubyists::Leopard::NatsJetstreamEndpoint.new(**base_endpoint_attributes, consumer:) + end + + def symbol_key_overrides + { + durable_name: 'override-durable', + filter_subject: 'override.subject', + ack_policy: 'none', + ack_wait: 30, + max_deliver: 5, + } + end + + def string_key_overrides + { + 'durable_name' => 'override-durable', + 'filter_subject' => 'override.subject', + 'ack_policy' => 'none', + 'max_ack_pending' => 100, + } + end + + def base_endpoint_attributes + { + name: :events, stream: 'EVENTS', subject: 'events.created', durable: 'events-consumer', + batch: 1, fetch_timeout: 1, nak_delay: nil, handler: proc {} + } + end +end From 4d90f9204ca0c122bb1b4eb11b18d26665a50966 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 16:14:59 -0400 Subject: [PATCH 04/12] fix: adds the dry/monads require at the top-level --- lib/leopard.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/leopard.rb b/lib/leopard.rb index 1e328dc..c9453e3 100644 --- a/lib/leopard.rb +++ b/lib/leopard.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'dry/configurable' +require 'dry/monads' require 'pathname' require 'semantic_logger' From 671eae9ee74e9e9b75a836537caa527df7656e31 Mon Sep 17 00:00:00 2001 From: bougyman Date: Thu, 16 Apr 2026 16:16:10 -0400 Subject: [PATCH 05/12] fix: Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Rakefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Rakefile b/Rakefile index 42f665a..afd2395 100644 --- a/Rakefile +++ b/Rakefile @@ -89,7 +89,8 @@ namespace :nats do desc 'Stop the local NATS JetStream broker container' task :stop do - sh("#{container_runtime} rm -f #{ENV.fetch('NATS_NAME', 'leopard-nats')}", verbose: false) + name = ENV.fetch('NATS_NAME', 'leopard-nats') + sh(container_runtime, 'rm', '-f', name, verbose: false) rescue RuntimeError nil end From 30439abb5b1c38f34e0d2d19bd289c498ead590f Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 16:23:42 -0400 Subject: [PATCH 06/12] refactor: use private attr_readers for the message processing dependencies --- lib/leopard/message_processor.rb | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/leopard/message_processor.rb b/lib/leopard/message_processor.rb index 8bd7bda..710d9aa 100644 --- a/lib/leopard/message_processor.rb +++ b/lib/leopard/message_processor.rb @@ -4,6 +4,8 @@ module Rubyists module Leopard # Composes middleware around Leopard handlers and routes their results to transport callbacks. class MessageProcessor + private attr_reader :execute_handler, :logger, :middleware, :wrapper_factory + # Builds a reusable processor for request/reply and JetStream transports. # # @param wrapper_factory [#call] Callable that wraps a raw transport message in a {MessageWrapper}-compatible object. @@ -25,7 +27,7 @@ def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) # @param callbacks [Hash{Symbol => #call}] Success, failure, and error callbacks for the transport. # @return [Object] The transport-specific callback result. def process(raw_msg, handler, callbacks) - app(callbacks, handler).call(@wrapper_factory.call(raw_msg)) + app(callbacks, handler).call(wrapper_factory.call(raw_msg)) end private @@ -36,7 +38,7 @@ def process(raw_msg, handler, callbacks) # @param handler [Proc] The endpoint handler to execute at the core of the stack. # @return [#call] The composed middleware application. def app(callbacks, handler) - @middleware.call.reverse_each.reduce(base_app(handler, callbacks)) do |current, (klass, args, blk)| + middleware.call.reverse_each.reduce(base_app(handler, callbacks)) do |current, (klass, args, blk)| klass.new(current, *args, &blk) end end @@ -48,10 +50,10 @@ def app(callbacks, handler) # @return [Proc] The terminal application for the middleware chain. def base_app(handler, callbacks) lambda do |wrapper| - result = @execute_handler.call(wrapper, handler) + result = execute_handler.call(wrapper, handler) process_result(wrapper, result, callbacks) rescue StandardError => e - @logger.error 'Error processing message: ', e + logger.error 'Error processing message: ', e callbacks[:on_error].call(wrapper, e) end end @@ -70,7 +72,7 @@ def process_result(wrapper, result, callbacks) in Dry::Monads::Failure callbacks[:on_failure].call(wrapper, result) else - @logger.error('Unexpected result: ', result:) + logger.error('Unexpected result: ', result:) raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" end end From 0295e97c1565681b070806e0fb280d0d1abd0ba2 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 16:31:19 -0400 Subject: [PATCH 07/12] style: adds the clean @return line --- Rakefile | 6 ++++ examples/echo_endpoint.rb | 1 + lib/leopard.rb | 1 + lib/leopard/errors.rb | 2 ++ lib/leopard/message_processor.rb | 5 +++ lib/leopard/message_wrapper.rb | 7 ++++ lib/leopard/metrics_server.rb | 9 +++++ lib/leopard/nats_api_server.rb | 39 +++++++++++++++++++++ lib/leopard/nats_jetstream_callbacks.rb | 6 ++++ lib/leopard/nats_jetstream_consumer.rb | 15 ++++++++ lib/leopard/nats_request_reply_callbacks.rb | 6 ++++ 11 files changed, 97 insertions(+) diff --git a/Rakefile b/Rakefile index afd2395..db093b6 100644 --- a/Rakefile +++ b/Rakefile @@ -25,11 +25,13 @@ QUICK_TEST_FILES = Dir['test/*/**/*.rb'].reject { |file| file.start_with?('test/ # Returns the local NATS JetStream health endpoint used by the CI helpers. # +# # @return [URI::HTTP] The health endpoint URI. def nats_health_uri = URI('http://127.0.0.1:8222/healthz') # Reports whether the local NATS JetStream health endpoint is currently reachable. # +# # @return [Boolean] `true` when the broker responds successfully, otherwise `false`. def nats_ready? Net::HTTP.get_response(nats_health_uri).is_a?(Net::HTTPSuccess) @@ -39,6 +41,7 @@ end # Waits for the local NATS JetStream broker to report healthy. # +# # @return [void] # @raise [RuntimeError] If the broker does not become healthy within 30 seconds. def wait_for_nats! @@ -51,6 +54,7 @@ end # Detects the container runtime used to manage the local NATS broker. # +# # @return [String] `podman` when available, otherwise `docker`. def container_runtime File.executable?('/usr/bin/podman') || system('command -v podman > /dev/null 2>&1', exception: false) ? 'podman' : 'docker' @@ -58,6 +62,7 @@ end # Runs the non-integration test files directly for a fast local feedback loop. # +# # @return [void] def run_quick_tests! sh "ruby -w -Ilib -Itest #{QUICK_TEST_FILES.shelljoin}" @@ -65,6 +70,7 @@ end # Verifies that the current YARD coverage is complete. # +# # @return [void] # @raise [RuntimeError] If YARD reports anything less than 100% documentation coverage. def verify_yard_coverage! diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 7cfd86a..51ac12b 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -12,6 +12,7 @@ class EchoService # Builds the example service instance. # # @param a_var [Integer] Example initializer state passed through `instance_args`. + # # @return [void] def initialize(a_var: 1) logger.info "EchoService initialized with a_var: #{a_var}" diff --git a/lib/leopard.rb b/lib/leopard.rb index c9453e3..50954b1 100644 --- a/lib/leopard.rb +++ b/lib/leopard.rb @@ -12,6 +12,7 @@ class Pathname # # @param other [#to_s] The path fragment to append. # + # # @return [Pathname] The combined path. def /(other) join other.to_s diff --git a/lib/leopard/errors.rb b/lib/leopard/errors.rb index 706d0a6..c2d5f48 100644 --- a/lib/leopard/errors.rb +++ b/lib/leopard/errors.rb @@ -6,6 +6,7 @@ module Leopard class LeopardError < StandardError # Captures the original exception state while replacing the backtrace with the current call stack. # + # # @return [void] def initialize(...) super @@ -14,6 +15,7 @@ def initialize(...) # Returns a Leopard-truncated backtrace. # + # # @return [Array] Up to the first four backtrace entries, plus a truncation marker when applicable. def backtrace # If the backtrace is nil, return an empty array diff --git a/lib/leopard/message_processor.rb b/lib/leopard/message_processor.rb index 710d9aa..fad9bfa 100644 --- a/lib/leopard/message_processor.rb +++ b/lib/leopard/message_processor.rb @@ -12,6 +12,7 @@ class MessageProcessor # @param middleware [#call] Callable returning the current middleware stack. # @param execute_handler [#call] Callable that executes the endpoint handler with the wrapped message. # @param logger [#error] Logger used for processing failures. + # # @return [void] def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) @wrapper_factory = wrapper_factory @@ -25,6 +26,7 @@ def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) # @param raw_msg [Object] The raw transport message from NATS. # @param handler [Proc] The endpoint handler to execute. # @param callbacks [Hash{Symbol => #call}] Success, failure, and error callbacks for the transport. + # # @return [Object] The transport-specific callback result. def process(raw_msg, handler, callbacks) app(callbacks, handler).call(wrapper_factory.call(raw_msg)) @@ -36,6 +38,7 @@ def process(raw_msg, handler, callbacks) # # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. # @param handler [Proc] The endpoint handler to execute at the core of the stack. + # # @return [#call] The composed middleware application. def app(callbacks, handler) middleware.call.reverse_each.reduce(base_app(handler, callbacks)) do |current, (klass, args, blk)| @@ -47,6 +50,7 @@ def app(callbacks, handler) # # @param handler [Proc] The endpoint handler to execute. # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # # @return [Proc] The terminal application for the middleware chain. def base_app(handler, callbacks) lambda do |wrapper| @@ -63,6 +67,7 @@ def base_app(handler, callbacks) # @param wrapper [MessageWrapper] The wrapped transport message. # @param result [Dry::Monads::Result] The handler result to route. # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # # @return [Object] The callback return value for the routed result. # @raise [ResultError] If the handler returned a non-result object. def process_result(wrapper, result, callbacks) diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index 0b40f73..1684563 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -7,13 +7,16 @@ module Leopard # Wraps a raw NATS message with parsed payload and convenience response helpers. class MessageWrapper # @!attribute [r] raw + # # @return [NATS::Message] The original NATS message. # # @!attribute [r] data + # # @return [Object] The parsed data from the NATS message. attr_reader :raw, :data # # @!attribute [w] headers + # # @return [Hash] The headers from the NATS message. attr_accessor :headers @@ -26,6 +29,7 @@ def initialize(nats_msg) # @param payload [Object] The payload to respond with. # + # # @return [void] def respond(payload) raw.header = headers unless headers.empty? @@ -34,6 +38,7 @@ def respond(payload) # @param err [Object] The error payload to respond with. # + # # @return [void] def respond_with_error(err, &) raw.respond_with_error(err, &) @@ -47,6 +52,7 @@ def respond_with_error(err, &) # # @param raw [String] The raw data from the NATS message. # + # # @return [Object] The parsed data, or the raw string if parsing fails. def parse_data(raw) JSON.parse(raw) @@ -57,6 +63,7 @@ def parse_data(raw) # Serializes the object to a JSON string if it is not already a string. # @param obj [Object] The object to serialize. # + # # @return [String] The serialized JSON string or the original string. def serialize(obj) obj.is_a?(String) ? obj : JSON.generate(obj) diff --git a/lib/leopard/metrics_server.rb b/lib/leopard/metrics_server.rb index 87b3b82..093d391 100644 --- a/lib/leopard/metrics_server.rb +++ b/lib/leopard/metrics_server.rb @@ -12,6 +12,7 @@ module MetricsServer # Starts a lightweight HTTP server that exposes Leopard Prometheus metrics. # # @param workers [Array] Active Leopard worker instances to observe. + # # @return [Thread] The server thread. def start_metrics_server(workers) port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i @@ -28,6 +29,7 @@ def start_metrics_server(workers) # # @param client [TCPSocket] The connected HTTP client. # @param workers [Array] Active Leopard worker instances to observe. + # # @return [void] def handle_metrics_client(client, workers) request_line = client.gets @@ -42,6 +44,7 @@ def handle_metrics_client(client, workers) # Closes a metrics client socket, ignoring cleanup failures. # # @param client [TCPSocket] The connected HTTP client. + # # @return [void] def close_client(client) client.close @@ -54,6 +57,7 @@ def close_client(client) # @param client [TCPSocket] The connected HTTP client. # @param request_line [String, nil] The first line of the HTTP request. # @param workers [Array] Active Leopard worker instances to observe. + # # @return [void] def write_metrics_response(client, request_line, workers) if request_line&.start_with?('GET /metrics') @@ -69,6 +73,7 @@ def write_metrics_response(client, request_line, workers) # Builds the Prometheus metrics payload for the current worker state. # # @param workers [Array] Active Leopard worker instances to observe. + # # @return [String] Rendered Prometheus text exposition output. def prometheus_metrics(workers) metrics = collect_prometheus_metrics(workers) @@ -78,6 +83,7 @@ def prometheus_metrics(workers) # Aggregates per-subject worker utilization metrics. # # @param workers [Array] Active Leopard worker instances to observe. + # # @return [Hash{Symbol => Object}] Metric hashes for the Prometheus template. def collect_prometheus_metrics(workers) busy = Hash.new(0) @@ -96,6 +102,7 @@ def collect_prometheus_metrics(workers) # @param worker [Object] A Leopard worker instance. # @param busy [Hash{String => Integer}] Subject-to-busy-worker counts. # @param pending [Hash{String => Integer}] Subject-to-pending-message counts. + # # @return [void] def accumulate_worker_metrics(worker, busy, pending) service = worker.instance_variable_get(:@service) @@ -115,6 +122,7 @@ def accumulate_worker_metrics(worker, busy, pending) # Renders the metrics ERB template with aggregated metric data. # # @param metrics [Hash{Symbol => Object}] Aggregated metric data for template rendering. + # # @return [String] The rendered Prometheus payload. def render_metrics_template(metrics) ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics) @@ -122,6 +130,7 @@ def render_metrics_template(metrics) # Returns the absolute path to the Prometheus metrics template. # + # # @return [String] The metrics template path. def metrics_template_path File.expand_path('templates/prometheus_metrics.erb', __dir__) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 4bec29b..01aaa67 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -22,6 +22,7 @@ module NatsApiServer # Extends an including class with Leopard's DSL and worker lifecycle behavior. # # @param base [Class] The class including this module. + # # @return [void] def self.included(base) base.extend(ClassMethods) @@ -41,18 +42,22 @@ module ClassMethods # Returns the configured request/reply endpoints for the service class. # + # # @return [Array] Declared request/reply endpoints. def endpoints = @endpoints ||= [] # Returns the configured JetStream endpoints for the service class. # + # # @return [Array] Declared JetStream pull-consumer endpoints. def jetstream_endpoints = @jetstream_endpoints ||= [] # Returns the configured endpoint groups for the service class. # + # # @return [Hash{Symbol,String => Hash}] Declared group definitions. def groups = @groups ||= {} # Returns the configured middleware stack for the service class. # + # # @return [Array] Middleware declarations in registration order. def middleware = @middleware ||= [] @@ -67,6 +72,7 @@ def middleware = @middleware ||= [] # @yieldparam wrapper [MessageWrapper] The wrapped inbound NATS message. # @yieldreturn [Dry::Monads::Result] The handler result. # + # # @return [void] def endpoint(name, subject: nil, queue: nil, group: nil, &handler) endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) @@ -88,6 +94,7 @@ def endpoint(name, subject: nil, queue: nil, group: nil, &handler) # @yieldparam wrapper [MessageWrapper] The wrapped inbound JetStream message. # @yieldreturn [Dry::Monads::Result] The handler result. # + # # @return [void] def jetstream_endpoint(name, **options, &handler) jetstream_endpoints << build_jetstream_endpoint(name, options, handler) @@ -99,6 +106,7 @@ def jetstream_endpoint(name, **options, &handler) # @param group [String, nil] The parent group this group belongs to. Defaults to nil. # @param queue [String, nil] The NATS queue group to use for this group. Defaults to nil. # + # # @return [void] def group(name, group: nil, queue: nil) groups[name] = { name:, parent: group, queue: } @@ -110,6 +118,7 @@ def group(name, group: nil, queue: nil) # @param args [Array] Optional arguments to pass to the middleware class. # @param block [Proc] Optional block to pass to the middleware class. # + # # @return [void] def use(klass, *args, &block) middleware << [klass, args, block] @@ -123,6 +132,7 @@ def use(klass, *args, &block) # @param instances [Integer] The number of instances to spawn. Defaults to 1. # @param blocking [Boolean] If false, does not block current thread after starting the server. Defaults to true. # + # # @return [Concurrent::FixedThreadPool, void] The worker pool for non-blocking runs, otherwise blocks forever. def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' @@ -146,6 +156,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param workers [Array] The array to store worker instances. # @param blocking [Boolean] If false, does not block current thread after starting the server. # + # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. # @raise [ArgumentError] If `instance_args` was provided but is not a hash. def spawn_instances(url, opts, count, workers, blocking) @@ -167,6 +178,7 @@ def spawn_instances(url, opts, count, workers, blocking) # @param workers [Array] The array to store worker instances. # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. # + # # @return [void] def build_worker(nats_url, service_opts, workers, blocking) worker = @instance_args ? new(**@instance_args) : new @@ -182,6 +194,7 @@ def build_worker(nats_url, service_opts, workers, blocking) # @param workers [Array] The array of worker instances to stop. # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. # + # # @return [Proc] A lambda that performs the shutdown operations. def shutdown(workers, pool) lambda do @@ -201,6 +214,7 @@ def shutdown(workers, pool) # @param workers [Array] The array of worker instances to stop on signal. # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. # + # # @return [void] def trap_signals(workers, pool) return if @trapped @@ -218,6 +232,7 @@ def trap_signals(workers, pool) # This is useful when the server is running in a blocking mode. # If the main thread is not blocked, this method does just exits. # + # # @return [void] def wake_main_thread_and_exit! Thread.main.wakeup @@ -233,6 +248,7 @@ def wake_main_thread_and_exit! # @param name [String, Symbol] Endpoint name. # @param options [Hash] JetStream endpoint options. # @param handler [Proc] Endpoint handler block. + # # @return [NatsJetstreamEndpoint] The configured JetStream endpoint definition. def build_jetstream_endpoint(name, options, handler) NatsJetstreamEndpoint.new( @@ -251,6 +267,7 @@ def build_jetstream_endpoint(name, options, handler) module WorkerLifecycle # Returns the logger configured for the NATS API server. # + # # @return [Object] The configured logger. def logger = self.class.logger @@ -260,6 +277,7 @@ def logger = self.class.logger # @param nats_url [String] The URL of the NATS server. # @param service_opts [Hash] Options for the NATS service. # + # # @return [void] def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) initialize_worker_state @@ -274,6 +292,7 @@ def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) # @see #setup_worker # @param nats_url [String] The URL of the NATS server. # @param service_opts [Hash] Options for the NATS service. + # # @return [void] def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) setup_worker(nats_url:, service_opts:) @@ -282,6 +301,7 @@ def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) # Stops the NATS API server worker. # + # # @return [void] def stop @running = false @@ -296,6 +316,7 @@ def stop # Captures the current thread for later wakeup during shutdown. # + # # @return [Thread] The current worker thread. def initialize_worker_state @thread = Thread.current @@ -304,6 +325,7 @@ def initialize_worker_state # Opens the NATS client connection for this worker. # # @param nats_url [String] The URL of the NATS server. + # # @return [Object] The connected NATS client. def connect_client(nats_url) @client = NATS.connect(nats_url) @@ -312,6 +334,7 @@ def connect_client(nats_url) # Registers the NATS service for this worker. # # @param service_opts [Hash] Options for the NATS service. + # # @return [Object] The created NATS service. def initialize_service(service_opts) @service = @client.services.add(build_service_opts(service_opts:)) @@ -320,6 +343,7 @@ def initialize_service(service_opts) # Starts the JetStream consumer coordinator when JetStream endpoints are present. # # @param endpoints [Array] JetStream endpoints for this worker. + # # @return [void] def start_jetstream_consumer(endpoints) return if endpoints.empty? @@ -336,6 +360,7 @@ def start_jetstream_consumer(endpoints) # Stops the JetStream consumer coordinator if one was started. # + # # @return [void] def stop_jetstream @jetstream_consumer&.stop @@ -343,6 +368,7 @@ def stop_jetstream # Stops the registered NATS service and closes the client connection. # + # # @return [void] def stop_service @service&.stop @@ -351,6 +377,7 @@ def stop_service # Wakes the worker thread if it is blocked. # + # # @return [Thread, nil] The awakened worker thread, if present. def wake_worker @thread&.wakeup @@ -358,6 +385,7 @@ def wake_worker # Returns the JetStream consumer coordinator class for this worker. # + # # @return [Class] The JetStream consumer implementation class. def jetstream_consumer_class NatsJetstreamConsumer @@ -365,6 +393,7 @@ def jetstream_consumer_class # Returns the thread factory used for JetStream consumer loops. # + # # @return [Class] The thread factory class. def thread_factory Thread @@ -374,6 +403,7 @@ def thread_factory # # @param service_opts [Hash] Options for the NATS service. # + # # @return [Hash] The complete service options including name and version. def build_service_opts(service_opts:) { @@ -386,6 +416,7 @@ def build_service_opts(service_opts:) # # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. # + # # @return [Hash] A map of group names to their created group objects. def add_groups(gps) created = {} @@ -399,6 +430,7 @@ def add_groups(gps) # @param cache [Hash] A cache to store already created groups. # @param name [String] The name of the group to build. # + # # @return [NATS::Group] The created group object. # @raise [ArgumentError] If the requested group was never defined. def build_group(defs, cache, name) @@ -416,6 +448,7 @@ def build_group(defs, cache, name) # @param endpoints [Array] The list of endpoints to add. # @param group_map [Hash] A map of group names to their created group objects. # + # # @return [void] # @raise [ArgumentError] If an endpoint references an undefined group. def add_endpoints(endpoints, group_map) @@ -433,6 +466,7 @@ def add_endpoints(endpoints, group_map) module MessageHandling # Returns the logger configured for the NATS API server. # + # # @return [Object] The configured logger. def logger = self.class.logger @@ -444,6 +478,7 @@ def logger = self.class.logger # @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler. # NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion. # + # # @return [void] def build_endpoint(parent, ept) parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| @@ -456,6 +491,7 @@ def build_endpoint(parent, ept) # @param raw_msg [Object] The raw NATS transport message. # @param handler [Proc] The endpoint handler block. # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # # @return [Object] The transport-specific callback result. def process_transport_message(raw_msg, handler, callbacks) message_processor.process(raw_msg, handler, callbacks) @@ -463,6 +499,7 @@ def process_transport_message(raw_msg, handler, callbacks) # Returns the callback helper for request/reply endpoints. # + # # @return [NatsRequestReplyCallbacks] The request/reply callback helper. def request_reply_callbacks @request_reply_callbacks ||= NatsRequestReplyCallbacks.new(logger:) @@ -470,6 +507,7 @@ def request_reply_callbacks # Returns the memoized message processor for this worker instance. # + # # @return [MessageProcessor] The shared message processor. def message_processor @message_processor ||= MessageProcessor.new( @@ -484,6 +522,7 @@ def message_processor # # @param wrapper [MessageWrapper] The wrapped transport message. # @param handler [Proc] The endpoint handler block. + # # @return [Dry::Monads::Result] The handler result. def execute_handler(wrapper, handler) instance_exec(wrapper, &handler) diff --git a/lib/leopard/nats_jetstream_callbacks.rb b/lib/leopard/nats_jetstream_callbacks.rb index 0ec55bd..1e8a2bc 100644 --- a/lib/leopard/nats_jetstream_callbacks.rb +++ b/lib/leopard/nats_jetstream_callbacks.rb @@ -7,6 +7,7 @@ class NatsJetstreamCallbacks # Builds a callback set for JetStream message outcomes. # # @param logger [#error] Logger used for failures and unhandled exceptions. + # # @return [void] def initialize(logger:) @logger = logger @@ -15,6 +16,7 @@ def initialize(logger:) # Returns transport callbacks for a JetStream endpoint. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. def callbacks_for(endpoint) { @@ -30,6 +32,7 @@ def callbacks_for(endpoint) # # @param wrapper [MessageWrapper] Wrapped JetStream message. # @param _result [Dry::Monads::Success] Successful handler result. + # # @return [void] def ack_message(wrapper, _result) wrapper.raw.ack @@ -40,6 +43,7 @@ def ack_message(wrapper, _result) # @param wrapper [MessageWrapper] Wrapped JetStream message. # @param result [Dry::Monads::Failure] Failed handler result. # @param endpoint [NatsJetstreamEndpoint] Endpoint configuration for the message. + # # @return [void] def nak_message(wrapper, result, endpoint) log_failure(result.failure) @@ -52,6 +56,7 @@ def nak_message(wrapper, result, endpoint) # # @param wrapper [MessageWrapper] Wrapped JetStream message. # @param error [StandardError] The unhandled exception. + # # @return [void] def term_message(wrapper, error) @logger.error 'Unhandled JetStream error: ', error @@ -61,6 +66,7 @@ def term_message(wrapper, error) # Logs the failure payload returned by a handler. # # @param failure [Object] The failure payload from the handler. + # # @return [void] def log_failure(failure) @logger.error 'Error processing message: ', failure diff --git a/lib/leopard/nats_jetstream_consumer.rb b/lib/leopard/nats_jetstream_consumer.rb index fde6bd4..679e467 100644 --- a/lib/leopard/nats_jetstream_consumer.rb +++ b/lib/leopard/nats_jetstream_consumer.rb @@ -11,8 +11,10 @@ class NatsJetstreamConsumer PROTECTED_CONSUMER_KEYS = %i[durable_name filter_subject ack_policy].freeze # @!attribute [r] subscriptions + # # @return [Array] Active JetStream pull subscriptions. # @!attribute [r] threads + # # @return [Array] Consumer loop threads for each endpoint. attr_reader :subscriptions, :threads @@ -25,6 +27,7 @@ class NatsJetstreamConsumer # @param dependencies [Hash{Symbol => Object}] Optional collaborators for callback and thread creation. # @option dependencies [Class] :callback_builder (NatsJetstreamCallbacks) Builder for transport callbacks. # @option dependencies [Class] :thread_factory (Thread) Thread-like factory used to spawn consumer loops. + # # @return [void] def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) @jetstream = jetstream @@ -40,6 +43,7 @@ def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies # Starts one pull-consumer loop per configured endpoint. # + # # @return [void] def start @running = true @@ -48,6 +52,7 @@ def start # Stops all pull-consumer loops and waits for them to exit. # + # # @return [void] def stop @running = false @@ -60,6 +65,7 @@ def stop # Starts a consumer loop for one endpoint. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to consume. + # # @return [void] def start_endpoint(endpoint) subscription = build_subscription(endpoint) @@ -70,6 +76,7 @@ def start_endpoint(endpoint) # Ensures the durable consumer exists and creates a pull subscription for it. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to subscribe to. + # # @return [Object] The JetStream pull subscription. def build_subscription(endpoint) ensure_consumer(endpoint) @@ -83,6 +90,7 @@ def build_subscription(endpoint) # Verifies that the durable consumer exists, creating it when missing. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to ensure. + # # @return [Object] Consumer metadata from `consumer_info` or `add_consumer`. def ensure_consumer(endpoint) @jetstream.consumer_info(endpoint.stream, endpoint.durable) @@ -93,6 +101,7 @@ def ensure_consumer(endpoint) # Builds the JetStream consumer configuration for an endpoint. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to translate. + # # @return [Hash] Consumer configuration accepted by `add_consumer`. def consumer_config(endpoint) base = { @@ -106,6 +115,7 @@ def consumer_config(endpoint) # Normalizes optional consumer overrides into a hash. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to inspect. + # # @return [Hash] Consumer overrides, or an empty hash when none were provided. def normalized_consumer_options(endpoint) return {} unless endpoint.consumer @@ -117,6 +127,7 @@ def normalized_consumer_options(endpoint) # Removes Leopard-managed consumer keys from user overrides. # # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to inspect. + # # @return [Hash] Consumer overrides excluding protected keys required by Leopard. def safe_consumer_options(endpoint) normalized_consumer_options(endpoint).reject { |key, _value| PROTECTED_CONSUMER_KEYS.include?(key.to_sym) } @@ -126,6 +137,7 @@ def safe_consumer_options(endpoint) # # @param subscription [Object] Pull subscription for the endpoint. # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # # @return [void] def consume_endpoint(subscription, endpoint) while @running @@ -144,6 +156,7 @@ def consume_endpoint(subscription, endpoint) # # @param subscription [Object] Pull subscription for the endpoint. # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # # @return [void] def consume_batch(subscription, endpoint) fetch_messages(subscription, endpoint).each do |raw_msg| @@ -155,6 +168,7 @@ def consume_batch(subscription, endpoint) # # @param subscription [Object] Pull subscription for the endpoint. # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # # @return [Array] Raw JetStream messages returned by the subscription. def fetch_messages(subscription, endpoint) subscription.fetch(endpoint.batch, timeout: endpoint.fetch_timeout) @@ -164,6 +178,7 @@ def fetch_messages(subscription, endpoint) # # @param endpoint [NatsJetstreamEndpoint] The endpoint whose loop failed. # @param error [StandardError] The raised exception. + # # @return [void] def log_loop_error(endpoint, error) @logger.error "JetStream endpoint #{endpoint.name} loop error: ", error diff --git a/lib/leopard/nats_request_reply_callbacks.rb b/lib/leopard/nats_request_reply_callbacks.rb index f4502df..3af1c6f 100644 --- a/lib/leopard/nats_request_reply_callbacks.rb +++ b/lib/leopard/nats_request_reply_callbacks.rb @@ -7,6 +7,7 @@ class NatsRequestReplyCallbacks # Builds a callback set for request/reply endpoint outcomes. # # @param logger [#error] Logger used for failure payloads. + # # @return [void] def initialize(logger:) @logger = logger @@ -14,6 +15,7 @@ def initialize(logger:) # Returns transport callbacks for request/reply endpoints. # + # # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. def callbacks { @@ -29,6 +31,7 @@ def callbacks # # @param wrapper [MessageWrapper] Wrapped request message. # @param result [Dry::Monads::Success] Successful handler result. + # # @return [void] def respond_with_success(wrapper, result) wrapper.respond(result.value!) @@ -38,6 +41,7 @@ def respond_with_success(wrapper, result) # # @param wrapper [MessageWrapper] Wrapped request message. # @param result [Dry::Monads::Failure] Failed handler result. + # # @return [void] def respond_with_failure(wrapper, result) log_failure(result.failure) @@ -48,6 +52,7 @@ def respond_with_failure(wrapper, result) # # @param wrapper [MessageWrapper] Wrapped request message. # @param error [StandardError] The unhandled exception. + # # @return [void] def respond_with_error(wrapper, error) wrapper.respond_with_error(error) @@ -56,6 +61,7 @@ def respond_with_error(wrapper, error) # Logs the failure payload returned by a handler. # # @param failure [Object] The failure payload from the handler. + # # @return [void] def log_failure(failure) @logger.error 'Error processing message: ', failure From 2986a7c9494ace1425cc5042e56d1ed46a2be53c Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 17:18:11 -0400 Subject: [PATCH 08/12] style: removes extra blank comment lines --- Rakefile | 6 ---- lib/leopard.rb | 1 - lib/leopard/errors.rb | 2 -- lib/leopard/message_wrapper.rb | 4 --- lib/leopard/metrics_server.rb | 1 - lib/leopard/nats_api_server.rb | 31 --------------------- lib/leopard/nats_jetstream_consumer.rb | 2 -- lib/leopard/nats_request_reply_callbacks.rb | 1 - 8 files changed, 48 deletions(-) diff --git a/Rakefile b/Rakefile index db093b6..afd2395 100644 --- a/Rakefile +++ b/Rakefile @@ -25,13 +25,11 @@ QUICK_TEST_FILES = Dir['test/*/**/*.rb'].reject { |file| file.start_with?('test/ # Returns the local NATS JetStream health endpoint used by the CI helpers. # -# # @return [URI::HTTP] The health endpoint URI. def nats_health_uri = URI('http://127.0.0.1:8222/healthz') # Reports whether the local NATS JetStream health endpoint is currently reachable. # -# # @return [Boolean] `true` when the broker responds successfully, otherwise `false`. def nats_ready? Net::HTTP.get_response(nats_health_uri).is_a?(Net::HTTPSuccess) @@ -41,7 +39,6 @@ end # Waits for the local NATS JetStream broker to report healthy. # -# # @return [void] # @raise [RuntimeError] If the broker does not become healthy within 30 seconds. def wait_for_nats! @@ -54,7 +51,6 @@ end # Detects the container runtime used to manage the local NATS broker. # -# # @return [String] `podman` when available, otherwise `docker`. def container_runtime File.executable?('/usr/bin/podman') || system('command -v podman > /dev/null 2>&1', exception: false) ? 'podman' : 'docker' @@ -62,7 +58,6 @@ end # Runs the non-integration test files directly for a fast local feedback loop. # -# # @return [void] def run_quick_tests! sh "ruby -w -Ilib -Itest #{QUICK_TEST_FILES.shelljoin}" @@ -70,7 +65,6 @@ end # Verifies that the current YARD coverage is complete. # -# # @return [void] # @raise [RuntimeError] If YARD reports anything less than 100% documentation coverage. def verify_yard_coverage! diff --git a/lib/leopard.rb b/lib/leopard.rb index 50954b1..c9453e3 100644 --- a/lib/leopard.rb +++ b/lib/leopard.rb @@ -12,7 +12,6 @@ class Pathname # # @param other [#to_s] The path fragment to append. # - # # @return [Pathname] The combined path. def /(other) join other.to_s diff --git a/lib/leopard/errors.rb b/lib/leopard/errors.rb index c2d5f48..706d0a6 100644 --- a/lib/leopard/errors.rb +++ b/lib/leopard/errors.rb @@ -6,7 +6,6 @@ module Leopard class LeopardError < StandardError # Captures the original exception state while replacing the backtrace with the current call stack. # - # # @return [void] def initialize(...) super @@ -15,7 +14,6 @@ def initialize(...) # Returns a Leopard-truncated backtrace. # - # # @return [Array] Up to the first four backtrace entries, plus a truncation marker when applicable. def backtrace # If the backtrace is nil, return an empty array diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index 1684563..6dd1426 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -29,7 +29,6 @@ def initialize(nats_msg) # @param payload [Object] The payload to respond with. # - # # @return [void] def respond(payload) raw.header = headers unless headers.empty? @@ -38,7 +37,6 @@ def respond(payload) # @param err [Object] The error payload to respond with. # - # # @return [void] def respond_with_error(err, &) raw.respond_with_error(err, &) @@ -52,7 +50,6 @@ def respond_with_error(err, &) # # @param raw [String] The raw data from the NATS message. # - # # @return [Object] The parsed data, or the raw string if parsing fails. def parse_data(raw) JSON.parse(raw) @@ -63,7 +60,6 @@ def parse_data(raw) # Serializes the object to a JSON string if it is not already a string. # @param obj [Object] The object to serialize. # - # # @return [String] The serialized JSON string or the original string. def serialize(obj) obj.is_a?(String) ? obj : JSON.generate(obj) diff --git a/lib/leopard/metrics_server.rb b/lib/leopard/metrics_server.rb index 093d391..36d97bc 100644 --- a/lib/leopard/metrics_server.rb +++ b/lib/leopard/metrics_server.rb @@ -130,7 +130,6 @@ def render_metrics_template(metrics) # Returns the absolute path to the Prometheus metrics template. # - # # @return [String] The metrics template path. def metrics_template_path File.expand_path('templates/prometheus_metrics.erb', __dir__) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 01aaa67..69a74f3 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -42,22 +42,18 @@ module ClassMethods # Returns the configured request/reply endpoints for the service class. # - # # @return [Array] Declared request/reply endpoints. def endpoints = @endpoints ||= [] # Returns the configured JetStream endpoints for the service class. # - # # @return [Array] Declared JetStream pull-consumer endpoints. def jetstream_endpoints = @jetstream_endpoints ||= [] # Returns the configured endpoint groups for the service class. # - # # @return [Hash{Symbol,String => Hash}] Declared group definitions. def groups = @groups ||= {} # Returns the configured middleware stack for the service class. # - # # @return [Array] Middleware declarations in registration order. def middleware = @middleware ||= [] @@ -72,7 +68,6 @@ def middleware = @middleware ||= [] # @yieldparam wrapper [MessageWrapper] The wrapped inbound NATS message. # @yieldreturn [Dry::Monads::Result] The handler result. # - # # @return [void] def endpoint(name, subject: nil, queue: nil, group: nil, &handler) endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) @@ -94,7 +89,6 @@ def endpoint(name, subject: nil, queue: nil, group: nil, &handler) # @yieldparam wrapper [MessageWrapper] The wrapped inbound JetStream message. # @yieldreturn [Dry::Monads::Result] The handler result. # - # # @return [void] def jetstream_endpoint(name, **options, &handler) jetstream_endpoints << build_jetstream_endpoint(name, options, handler) @@ -106,7 +100,6 @@ def jetstream_endpoint(name, **options, &handler) # @param group [String, nil] The parent group this group belongs to. Defaults to nil. # @param queue [String, nil] The NATS queue group to use for this group. Defaults to nil. # - # # @return [void] def group(name, group: nil, queue: nil) groups[name] = { name:, parent: group, queue: } @@ -118,7 +111,6 @@ def group(name, group: nil, queue: nil) # @param args [Array] Optional arguments to pass to the middleware class. # @param block [Proc] Optional block to pass to the middleware class. # - # # @return [void] def use(klass, *args, &block) middleware << [klass, args, block] @@ -132,7 +124,6 @@ def use(klass, *args, &block) # @param instances [Integer] The number of instances to spawn. Defaults to 1. # @param blocking [Boolean] If false, does not block current thread after starting the server. Defaults to true. # - # # @return [Concurrent::FixedThreadPool, void] The worker pool for non-blocking runs, otherwise blocks forever. def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' @@ -156,7 +147,6 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param workers [Array] The array to store worker instances. # @param blocking [Boolean] If false, does not block current thread after starting the server. # - # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. # @raise [ArgumentError] If `instance_args` was provided but is not a hash. def spawn_instances(url, opts, count, workers, blocking) @@ -178,7 +168,6 @@ def spawn_instances(url, opts, count, workers, blocking) # @param workers [Array] The array to store worker instances. # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. # - # # @return [void] def build_worker(nats_url, service_opts, workers, blocking) worker = @instance_args ? new(**@instance_args) : new @@ -194,7 +183,6 @@ def build_worker(nats_url, service_opts, workers, blocking) # @param workers [Array] The array of worker instances to stop. # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. # - # # @return [Proc] A lambda that performs the shutdown operations. def shutdown(workers, pool) lambda do @@ -214,7 +202,6 @@ def shutdown(workers, pool) # @param workers [Array] The array of worker instances to stop on signal. # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. # - # # @return [void] def trap_signals(workers, pool) return if @trapped @@ -232,7 +219,6 @@ def trap_signals(workers, pool) # This is useful when the server is running in a blocking mode. # If the main thread is not blocked, this method does just exits. # - # # @return [void] def wake_main_thread_and_exit! Thread.main.wakeup @@ -267,7 +253,6 @@ def build_jetstream_endpoint(name, options, handler) module WorkerLifecycle # Returns the logger configured for the NATS API server. # - # # @return [Object] The configured logger. def logger = self.class.logger @@ -277,7 +262,6 @@ def logger = self.class.logger # @param nats_url [String] The URL of the NATS server. # @param service_opts [Hash] Options for the NATS service. # - # # @return [void] def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) initialize_worker_state @@ -301,7 +285,6 @@ def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) # Stops the NATS API server worker. # - # # @return [void] def stop @running = false @@ -316,7 +299,6 @@ def stop # Captures the current thread for later wakeup during shutdown. # - # # @return [Thread] The current worker thread. def initialize_worker_state @thread = Thread.current @@ -360,7 +342,6 @@ def start_jetstream_consumer(endpoints) # Stops the JetStream consumer coordinator if one was started. # - # # @return [void] def stop_jetstream @jetstream_consumer&.stop @@ -368,7 +349,6 @@ def stop_jetstream # Stops the registered NATS service and closes the client connection. # - # # @return [void] def stop_service @service&.stop @@ -377,7 +357,6 @@ def stop_service # Wakes the worker thread if it is blocked. # - # # @return [Thread, nil] The awakened worker thread, if present. def wake_worker @thread&.wakeup @@ -385,7 +364,6 @@ def wake_worker # Returns the JetStream consumer coordinator class for this worker. # - # # @return [Class] The JetStream consumer implementation class. def jetstream_consumer_class NatsJetstreamConsumer @@ -393,7 +371,6 @@ def jetstream_consumer_class # Returns the thread factory used for JetStream consumer loops. # - # # @return [Class] The thread factory class. def thread_factory Thread @@ -403,7 +380,6 @@ def thread_factory # # @param service_opts [Hash] Options for the NATS service. # - # # @return [Hash] The complete service options including name and version. def build_service_opts(service_opts:) { @@ -416,7 +392,6 @@ def build_service_opts(service_opts:) # # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. # - # # @return [Hash] A map of group names to their created group objects. def add_groups(gps) created = {} @@ -430,7 +405,6 @@ def add_groups(gps) # @param cache [Hash] A cache to store already created groups. # @param name [String] The name of the group to build. # - # # @return [NATS::Group] The created group object. # @raise [ArgumentError] If the requested group was never defined. def build_group(defs, cache, name) @@ -448,7 +422,6 @@ def build_group(defs, cache, name) # @param endpoints [Array] The list of endpoints to add. # @param group_map [Hash] A map of group names to their created group objects. # - # # @return [void] # @raise [ArgumentError] If an endpoint references an undefined group. def add_endpoints(endpoints, group_map) @@ -466,7 +439,6 @@ def add_endpoints(endpoints, group_map) module MessageHandling # Returns the logger configured for the NATS API server. # - # # @return [Object] The configured logger. def logger = self.class.logger @@ -478,7 +450,6 @@ def logger = self.class.logger # @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler. # NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion. # - # # @return [void] def build_endpoint(parent, ept) parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| @@ -499,7 +470,6 @@ def process_transport_message(raw_msg, handler, callbacks) # Returns the callback helper for request/reply endpoints. # - # # @return [NatsRequestReplyCallbacks] The request/reply callback helper. def request_reply_callbacks @request_reply_callbacks ||= NatsRequestReplyCallbacks.new(logger:) @@ -507,7 +477,6 @@ def request_reply_callbacks # Returns the memoized message processor for this worker instance. # - # # @return [MessageProcessor] The shared message processor. def message_processor @message_processor ||= MessageProcessor.new( diff --git a/lib/leopard/nats_jetstream_consumer.rb b/lib/leopard/nats_jetstream_consumer.rb index 679e467..9695e38 100644 --- a/lib/leopard/nats_jetstream_consumer.rb +++ b/lib/leopard/nats_jetstream_consumer.rb @@ -43,7 +43,6 @@ def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies # Starts one pull-consumer loop per configured endpoint. # - # # @return [void] def start @running = true @@ -52,7 +51,6 @@ def start # Stops all pull-consumer loops and waits for them to exit. # - # # @return [void] def stop @running = false diff --git a/lib/leopard/nats_request_reply_callbacks.rb b/lib/leopard/nats_request_reply_callbacks.rb index 3af1c6f..8c9ffd2 100644 --- a/lib/leopard/nats_request_reply_callbacks.rb +++ b/lib/leopard/nats_request_reply_callbacks.rb @@ -15,7 +15,6 @@ def initialize(logger:) # Returns transport callbacks for request/reply endpoints. # - # # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. def callbacks { From 9f16a71b7a4e322de5565e5c9bf973004183f462 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 16 Apr 2026 17:28:36 -0400 Subject: [PATCH 09/12] style: ensure latest ruby is used for dev/testing/ci --- .github/workflows/container.yaml | 2 +- .github/workflows/gem.yaml | 2 +- .github/workflows/main.yaml | 1 + Gemfile | 1 + Gemfile.lock | 29 ++++++++++++++++++++++++++--- mise.toml | 2 ++ 6 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 mise.toml diff --git a/.github/workflows/container.yaml b/.github/workflows/container.yaml index 22d9669..07bce5c 100644 --- a/.github/workflows/container.yaml +++ b/.github/workflows/container.yaml @@ -22,7 +22,7 @@ jobs: max-parallel: 5 matrix: alpine-version: ['3.20'] - ruby-version: ['3.4.1'] + ruby-version: ['4.0.2'] steps: - name: Checkout repository diff --git a/.github/workflows/gem.yaml b/.github/workflows/gem.yaml index a2b161e..b81461c 100644 --- a/.github/workflows/gem.yaml +++ b/.github/workflows/gem.yaml @@ -20,7 +20,7 @@ jobs: name: Set up Ruby uses: ruby/setup-ruby@v1 with: - ruby-version: 3.4.1 + ruby-version: 4.0.2 bundler-cache: false - name: Publish to ${{ matrix.registry }} diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 23d345e..e4fe55f 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -13,6 +13,7 @@ jobs: matrix: ruby: - '3.4.5' + - '4.0.2' steps: - uses: actions/checkout@v4 - name: Set up Ruby diff --git a/Gemfile b/Gemfile index a37644f..958fb8d 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ gemspec group :development, :test do gem 'asciidoctor' + gem 'irb' gem 'minitest' gem 'minitest-global_expectations' gem 'pry' diff --git a/Gemfile.lock b/Gemfile.lock index 50c5e53..411d7a2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -16,7 +16,9 @@ GEM base64 (0.3.0) coderay (1.1.3) concurrent-ruby (1.3.5) + date (3.5.1) docile (1.4.1) + drb (2.2.3) dry-configurable (1.3.0) dry-core (~> 1.1) zeitwerk (~> 2.6) @@ -28,13 +30,21 @@ GEM concurrent-ruby (~> 1.0) dry-core (~> 1.1) zeitwerk (~> 2.6) + erb (6.0.3) io-console (0.8.1) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) json (2.13.2) language_server-protocol (3.17.0.5) lint_roller (1.1.0) logger (1.7.0) method_source (1.1.0) - minitest (5.25.5) + minitest (6.0.4) + drb (~> 2.0) + prism (~> 1.5) minitest-global_expectations (1.0.1) minitest (> 5) nats-pure (2.5.0) @@ -47,13 +57,23 @@ GEM parser (3.3.9.0) ast (~> 2.4.1) racc - prism (1.4.0) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) pry (0.15.2) coderay (~> 1.1) method_source (~> 1.0) + psych (5.3.1) + date + stringio racc (1.8.1) rainbow (3.1.1) rake (13.3.0) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort regexp_parser (2.11.2) reline (0.6.2) io-console (~> 0.5) @@ -92,9 +112,11 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.13.2) simplecov_json_formatter (0.1.4) + stringio (3.2.0) + tsort (0.2.0) unicode-display_width (3.1.5) unicode-emoji (~> 4.0, >= 4.0.4) - unicode-emoji (4.0.4) + unicode-emoji (4.2.0) uri (1.0.3) yard (0.9.41) zeitwerk (2.7.3) @@ -105,6 +127,7 @@ PLATFORMS DEPENDENCIES asciidoctor + irb leopard! minitest minitest-global_expectations diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000..eefa30b --- /dev/null +++ b/mise.toml @@ -0,0 +1,2 @@ +[tools] +ruby = "latest" From 5c22c2ad777c558d86704a14916a7b4247b41a2b Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Mon, 20 Apr 2026 13:51:36 -0400 Subject: [PATCH 10/12] test: ensures minitest mock is loaded for tests --- test/helper.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/test/helper.rb b/test/helper.rb index d7162e7..39b99e5 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,6 +13,7 @@ end require 'minitest/autorun' +require 'minitest/mock' require_relative '../lib/leopard' # Suppress logs when running tests From c1952335652afde81e470f9f0b01ee83de652dd3 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Mon, 20 Apr 2026 14:02:17 -0400 Subject: [PATCH 11/12] fix: only require mock in ruby < 4 --- test/helper.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/helper.rb b/test/helper.rb index 39b99e5..1da14ef 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,7 +13,8 @@ end require 'minitest/autorun' -require 'minitest/mock' +# minitest 6.x (Ruby 4.0+) includes mock in core; 5.x requires it separately +require 'minitest/mock' unless defined?(Minitest::Mock) require_relative '../lib/leopard' # Suppress logs when running tests From ba3a89178dd2aa2d050f3c9103d7f869f2dee6b2 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Mon, 20 Apr 2026 14:13:28 -0400 Subject: [PATCH 12/12] fix(deps): add the minitest-mock gem for > 6.x minitest/mock support --- Gemfile | 1 + Gemfile.lock | 2 ++ test/helper.rb | 3 +-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Gemfile b/Gemfile index 958fb8d..2d9bb1a 100644 --- a/Gemfile +++ b/Gemfile @@ -10,6 +10,7 @@ group :development, :test do gem 'irb' gem 'minitest' gem 'minitest-global_expectations' + gem 'minitest-mock' gem 'pry' gem 'rake' gem 'reline' diff --git a/Gemfile.lock b/Gemfile.lock index 411d7a2..9f5e00a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -47,6 +47,7 @@ GEM prism (~> 1.5) minitest-global_expectations (1.0.1) minitest (> 5) + minitest-mock (5.27.0) nats-pure (2.5.0) base64 concurrent-ruby (~> 1.0) @@ -131,6 +132,7 @@ DEPENDENCIES leopard! minitest minitest-global_expectations + minitest-mock pry rake reline diff --git a/test/helper.rb b/test/helper.rb index 1da14ef..39b99e5 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,8 +13,7 @@ end require 'minitest/autorun' -# minitest 6.x (Ruby 4.0+) includes mock in core; 5.x requires it separately -require 'minitest/mock' unless defined?(Minitest::Mock) +require 'minitest/mock' require_relative '../lib/leopard' # Suppress logs when running tests