Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
leopard (0.2.5)
leopard (0.2.7)
concurrent-ruby (~> 1.1)
dry-configurable (~> 1.3)
dry-monads (~> 1.9)
Expand Down
61 changes: 39 additions & 22 deletions lib/leopard/metrics_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,45 +80,62 @@ def prometheus_metrics(workers)
render_metrics_template(metrics)
end

# Aggregates per-subject worker utilization metrics.
# Aggregates per-worker, per-subject saturation metrics and per-worker executor metrics.
#
# @param workers [Array<Object>] Active Leopard worker instances to observe.
#
# @return [Hash{Symbol => Object}] Metric hashes for the Prometheus template.
def collect_prometheus_metrics(workers)
busy = Hash.new(0)
pending = Hash.new(0)
workers.each { |w| accumulate_worker_metrics(w, busy, pending) }
{
busy:,
pending:,
subjects: (busy.keys | pending.keys).sort,
total: workers.size,
}
subject_metrics = []
executors = []

workers.each_with_index do |w, i|
accumulate_worker_metrics(w, i, subject_metrics)
ex = w.instance_variable_get(:@client)&.subscription_executor
executors << { worker: i, executor: ex } if ex
end

{ subject_metrics:, executors: }
end

# Adds one worker's endpoint saturation metrics to the aggregate hashes.
# Appends one worker's per-subject slot metrics to the subject_metrics array.
#
# @param worker [Object] A Leopard worker instance.
# @param busy [Hash{String => Integer}] Subject-to-busy-worker counts.
# @param pending [Hash{String => Integer}] Subject-to-pending-message counts.
# @param worker_index [Integer] Position of this worker in the workers array.
# @param subject_metrics [Array<Hash>] Accumulator for per-worker-per-subject metric rows.
#
# @return [void]
def accumulate_worker_metrics(worker, busy, pending)
def accumulate_worker_metrics(worker, worker_index, subject_metrics)
service = worker.instance_variable_get(:@service)
return unless service

service.endpoints.each do |ep|
# TODO: use ep.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint
sub = ep.instance_variable_get(:@handler)
next unless sub

subj = ep.subject.to_s
busy[subj] += sub.concurrency_semaphore.available_permits.zero? ? 1 : 0
pending[subj] += sub.pending_queue&.size.to_i
service.endpoints.each do |endpoint|
row = endpoint_subject_metrics(endpoint, worker_index)
subject_metrics << row if row
end
end

# Builds a per-worker-per-subject metric row from a single endpoint, or nil if not yet active.
#
# @param endpoint [Object] A NATS service endpoint.
# @param worker_index [Integer] Position of the owning worker in the workers array.
#
# @return [Hash, nil]
def endpoint_subject_metrics(endpoint, worker_index)
# TODO: use endpoint.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint
sub = endpoint.instance_variable_get(:@handler)
return unless sub

concurrency = sub.instance_variable_get(:@processing_concurrency).to_i
{
worker: worker_index,
subject: endpoint.subject.to_s,
busy_slots: concurrency - sub.concurrency_semaphore.available_permits,
capacity_slots: concurrency,
pending: sub.pending_queue&.size.to_i,
}
end

# Renders the metrics ERB template with aggregated metric data.
#
# @param metrics [Hash{Symbol => Object}] Aggregated metric data for template rendering.
Expand Down
40 changes: 29 additions & 11 deletions lib/leopard/templates/prometheus_metrics.erb
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
# HELP leopard_subject_busy_instances Instances currently processing a message on this subject
# TYPE leopard_subject_busy_instances gauge
<% subjects.each do |subject| -%>
leopard_subject_busy_instances{subject="<%= subject %>"} <%= busy[subject] %>
# HELP leopard_subject_busy_slots Thread slots actively processing a message for this subject on this worker
# TYPE leopard_subject_busy_slots gauge
<% subject_metrics.each do |m| -%>
leopard_subject_busy_slots{subject="<%= m[:subject] %>",worker="<%= m[:worker] %>"} <%= m[:busy_slots] %>
<% end -%>

# HELP leopard_subject_total_instances Total Leopard instances in this process
# TYPE leopard_subject_total_instances gauge
<% subjects.each do |subject| -%>
leopard_subject_total_instances{subject="<%= subject %>"} <%= total %>
# HELP leopard_subject_capacity_slots Total thread slots allocated for this subject on this worker
# TYPE leopard_subject_capacity_slots gauge
<% subject_metrics.each do |m| -%>
leopard_subject_capacity_slots{subject="<%= m[:subject] %>",worker="<%= m[:worker] %>"} <%= m[:capacity_slots] %>
<% end -%>

# HELP leopard_subject_pending_messages Messages pending processing across all instances
# HELP leopard_subject_pending_messages Messages waiting to acquire a processing slot for this subject on this worker
# TYPE leopard_subject_pending_messages gauge
<% subjects.each do |subject| -%>
leopard_subject_pending_messages{subject="<%= subject %>"} <%= pending[subject] %>
<% subject_metrics.each do |m| -%>
leopard_subject_pending_messages{subject="<%= m[:subject] %>",worker="<%= m[:worker] %>"} <%= m[:pending] %>
<% end -%>

# HELP leopard_executor_active_threads Approximate number of active threads in the subscription executor (concurrent-ruby active_count is approximate)
# TYPE leopard_executor_active_threads gauge
<% executors.each do |e| -%>
leopard_executor_active_threads{worker="<%= e[:worker] %>"} <%= e[:executor].active_count %>
<% end -%>

# HELP leopard_executor_max_threads Maximum threads in the subscription executor; queued_tasks goes positive when sum of capacity_slots across subjects exceeds this value
# TYPE leopard_executor_max_threads gauge
<% executors.each do |e| -%>
leopard_executor_max_threads{worker="<%= e[:worker] %>"} <%= e[:executor].max_length %>
<% end -%>

# HELP leopard_executor_queued_tasks Tasks holding a semaphore permit but waiting for a free executor thread; nonzero only when the executor pool is fully saturated
# TYPE leopard_executor_queued_tasks gauge
<% executors.each do |e| -%>
leopard_executor_queued_tasks{worker="<%= e[:worker] %>"} <%= e[:executor].queue_length %>
<% end -%>
81 changes: 58 additions & 23 deletions test/lib/nats_api_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,19 @@ def callback_set(on_success: ->(*_) {}, on_failure: ->(*_) {}, on_error: ->(*_)
end

describe 'prometheus metrics' do # rubocop:disable Metrics/BlockLength
let(:available_struct) { Struct.new(:zero?) { def available_permits = self } }
let(:semaphore_struct) { Struct.new(:available_permits) }
let(:queue_struct) { Struct.new(:pending_size) { def size = pending_size } }
let(:handler_struct) { Struct.new(:concurrency_semaphore, :pending_queue) }
let(:handler_struct) do
Struct.new(:concurrency_semaphore, :pending_queue, :processing_concurrency) do
def instance_variable_get(name)
return processing_concurrency if name == :@processing_concurrency

super
end
end
end
let(:executor_struct) { Struct.new(:active_count, :max_length, :queue_length) }
let(:client_struct) { Struct.new(:subscription_executor) }
let(:endpoint_struct) do
Struct.new(:subject) do
def initialize(subject, handler)
Expand All @@ -231,42 +241,67 @@ def initialize(subject, handler)
end
let(:service_struct) { Struct.new(:endpoints) }
let(:worker_struct) do
Struct.new(:service) do
Struct.new(:service, :client) do
def instance_variable_get(name)
return service if name == :@service
return client if name == :@client

super
end
end
end
let(:expected_metrics) do
<<~METRICS
# HELP leopard_subject_busy_instances Instances currently processing a message on this subject
# TYPE leopard_subject_busy_instances gauge
leopard_subject_busy_instances{subject="alpha"} 1
leopard_subject_busy_instances{subject="beta"} 0

# HELP leopard_subject_total_instances Total Leopard instances in this process
# TYPE leopard_subject_total_instances gauge
leopard_subject_total_instances{subject="alpha"} 2
leopard_subject_total_instances{subject="beta"} 2

# HELP leopard_subject_pending_messages Messages pending processing across all instances
# HELP leopard_subject_busy_slots Thread slots actively processing a message for this subject on this worker
# TYPE leopard_subject_busy_slots gauge
leopard_subject_busy_slots{subject="alpha",worker="0"} 1
leopard_subject_busy_slots{subject="beta",worker="0"} 0
leopard_subject_busy_slots{subject="alpha",worker="1"} 2

# HELP leopard_subject_capacity_slots Total thread slots allocated for this subject on this worker
# TYPE leopard_subject_capacity_slots gauge
leopard_subject_capacity_slots{subject="alpha",worker="0"} 2
leopard_subject_capacity_slots{subject="beta",worker="0"} 2
leopard_subject_capacity_slots{subject="alpha",worker="1"} 2

# HELP leopard_subject_pending_messages Messages waiting to acquire a processing slot for this subject on this worker
# TYPE leopard_subject_pending_messages gauge
leopard_subject_pending_messages{subject="alpha"} 5
leopard_subject_pending_messages{subject="beta"} 1
leopard_subject_pending_messages{subject="alpha",worker="0"} 3
leopard_subject_pending_messages{subject="beta",worker="0"} 1
leopard_subject_pending_messages{subject="alpha",worker="1"} 2

# HELP leopard_executor_active_threads Approximate number of active threads in the subscription executor (concurrent-ruby active_count is approximate)
# TYPE leopard_executor_active_threads gauge
leopard_executor_active_threads{worker="0"} 5
leopard_executor_active_threads{worker="1"} 10

# HELP leopard_executor_max_threads Maximum threads in the subscription executor; queued_tasks goes positive when sum of capacity_slots across subjects exceeds this value
# TYPE leopard_executor_max_threads gauge
leopard_executor_max_threads{worker="0"} 24
leopard_executor_max_threads{worker="1"} 24

# HELP leopard_executor_queued_tasks Tasks holding a semaphore permit but waiting for a free executor thread; nonzero only when the executor pool is fully saturated
# TYPE leopard_executor_queued_tasks gauge
leopard_executor_queued_tasks{worker="0"} 0
leopard_executor_queued_tasks{worker="1"} 2
METRICS
end

it 'renders prometheus metrics from the erb template' do
workers = [
worker_struct.new(service_struct.new([
endpoint_struct.new('alpha', handler_struct.new(available_struct.new(true), queue_struct.new(3))),
endpoint_struct.new('beta', handler_struct.new(available_struct.new(false), queue_struct.new(1))),
])),
worker_struct.new(service_struct.new([
endpoint_struct.new('alpha', handler_struct.new(available_struct.new(false), queue_struct.new(2))),
])),
worker_struct.new(
service_struct.new([
endpoint_struct.new('alpha', handler_struct.new(semaphore_struct.new(1), queue_struct.new(3), 2)),
endpoint_struct.new('beta', handler_struct.new(semaphore_struct.new(2), queue_struct.new(1), 2)),
]),
client_struct.new(executor_struct.new(5, 24, 0)),
),
worker_struct.new(
service_struct.new([
endpoint_struct.new('alpha', handler_struct.new(semaphore_struct.new(0), queue_struct.new(2), 2)),
]),
client_struct.new(executor_struct.new(10, 24, 2)),
),
]

assert_equal expected_metrics, @klass.send(:prometheus_metrics, workers)
Expand Down
Loading