Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic support for metrics exemplar #1609

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions metrics_api/lib/opentelemetry/internal/proxy_instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ module OpenTelemetry
module Internal
# @api private
class ProxyInstrument
def initialize(kind, name, unit, desc, callable)
def initialize(kind, name, unit, desc, callable, exemplar_filter, exemplar_reservoir)
@kind = kind
@name = name
@unit = unit
@desc = desc
@callable = callable
@exemplar_filter = exemplar_filter
@exemplar_reservoir = exemplar_reservoir
@delegate = nil
end

def upgrade_with(meter)
@delegate = case @kind
when :counter, :histogram, :up_down_counter
meter.send("create_#{@kind}", @name, unit: @unit, description: @desc)
meter.send("create_#{@kind}", @name, unit: @unit, description: @desc, exemplar_filter: @exemplar_filter, exemplar_reservoir: @exemplar_reservoir)
when :observable_counter, :observable_gauge, :observable_up_down_counter
meter.send("create_#{@kind}", @name, unit: @unit, description: @desc, callback: @callback)
end
Expand Down
10 changes: 5 additions & 5 deletions metrics_api/lib/opentelemetry/internal/proxy_meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def delegate=(meter)

private

def create_instrument(kind, name, unit, description, callback)
def create_instrument(kind, name, unit, description, callback, exemplar_filter, exemplar_reservoir)
super do
next ProxyInstrument.new(kind, name, unit, description, callback) if @delegate.nil?
next ProxyInstrument.new(kind, name, unit, description, callback, exemplar_filter, exemplar_reservoir) if @delegate.nil?

case kind
when :counter then @delegate.create_counter(name, unit: unit, description: description)
when :histogram then @delegate.create_histogram(name, unit: unit, description: description)
when :up_down_counter then @delegate.create_up_down_counter(name, unit: unit, description: description)
when :counter then @delegate.create_counter(name, unit: unit, description: description, exemplar_filter: exemplar_filter, exemplar_reservoir: exemplar_reservoir)
when :histogram then @delegate.create_histogram(name, unit: unit, description: description, exemplar_filter: exemplar_filter, exemplar_reservoir: exemplar_reservoir)
when :up_down_counter then @delegate.create_up_down_counter(name, unit: unit, description: description, exemplar_filter: exemplar_filter, exemplar_reservoir: exemplar_reservoir)
when :observable_counter then @delegate.create_observable_counter(name, unit: unit, description: description, callback: callback)
when :observable_gauge then @delegate.create_observable_gauge(name, unit: unit, description: description, callback: callback)
when :observable_up_down_counter then @delegate.create_observable_up_down_counter(name, unit: unit, description: description, callback: callback)
Expand Down
26 changes: 13 additions & 13 deletions metrics_api/lib/opentelemetry/metrics/meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,33 @@ def initialize
@instrument_registry = {}
end

def create_counter(name, unit: nil, description: nil)
create_instrument(:counter, name, unit, description, nil) { COUNTER }
def create_counter(name, unit: nil, description: nil, exemplar_filter: nil, exemplar_reservoir: nil)
create_instrument(:counter, name, unit, description, nil, exemplar_filter, exemplar_reservoir) { COUNTER }
end

def create_histogram(name, unit: nil, description: nil)
create_instrument(:histogram, name, unit, description, nil) { HISTOGRAM }
def create_histogram(name, unit: nil, description: nil, exemplar_filter: nil, exemplar_reservoir: nil)
create_instrument(:histogram, name, unit, description, nil, exemplar_filter, exemplar_reservoir) { HISTOGRAM }
end

def create_up_down_counter(name, unit: nil, description: nil)
create_instrument(:up_down_counter, name, unit, description, nil) { UP_DOWN_COUNTER }
def create_up_down_counter(name, unit: nil, description: nil, exemplar_filter: nil, exemplar_reservoir: nil)
create_instrument(:up_down_counter, name, unit, description, nil, exemplar_filter, exemplar_reservoir) { UP_DOWN_COUNTER }
end

def create_observable_counter(name, callback:, unit: nil, description: nil)
create_instrument(:observable_counter, name, unit, description, callback) { OBSERVABLE_COUNTER }
def create_observable_counter(name, callback:, unit: nil, description: nil, exemplar_filter: nil, exemplar_reservoir: nil)
create_instrument(:observable_counter, name, unit, description, callback, exemplar_filter, exemplar_reservoir) { OBSERVABLE_COUNTER }
end

def create_observable_gauge(name, callback:, unit: nil, description: nil)
create_instrument(:observable_gauge, name, unit, description, callback) { OBSERVABLE_GAUGE }
def create_observable_gauge(name, callback:, unit: nil, description: nil, exemplar_filter: nil, exemplar_reservoir: nil)
create_instrument(:observable_gauge, name, unit, description, callback, exemplar_filter, exemplar_reservoir) { OBSERVABLE_GAUGE }
end

def create_observable_up_down_counter(name, callback:, unit: nil, description: nil)
create_instrument(:observable_up_down_counter, name, unit, description, callback) { OBSERVABLE_UP_DOWN_COUNTER }
def create_observable_up_down_counter(name, callback:, unit: nil, description: nil, exemplar_filter: nil, exemplar_reservoir: nil)
create_instrument(:observable_up_down_counter, name, unit, description, callback, exemplar_filter, exemplar_reservoir) { OBSERVABLE_UP_DOWN_COUNTER }
end

private

def create_instrument(kind, name, unit, description, callback)
def create_instrument(kind, name, unit, description, callback, exemplar_filter, exemplar_reservoir)
@mutex.synchronize do
OpenTelemetry.logger.warn("duplicate instrument registration occurred for instrument #{name}") if @instrument_registry.include? name

Expand Down
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Metrics
end
end

require 'opentelemetry/sdk/metrics/exemplar'
require 'opentelemetry/sdk/metrics/aggregation'
require 'opentelemetry/sdk/metrics/configuration_patch'
require 'opentelemetry/sdk/metrics/export'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class ExplicitBucketHistogram
DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
private_constant :DEFAULT_BOUNDARIES

# if no reservior pass from instrument, then use this empty reservior to avoid no method found error
DEFAULT_RESERVOIR = Metrics::Exemplar::FixedSizeExemplarReservoir.new
private_constant :DEFAULT_RESERVOIR

attr_reader :aggregation_temporality

# The default value for boundaries represents the following buckets:
Expand All @@ -23,9 +27,10 @@ class ExplicitBucketHistogram
def initialize(
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), # TODO: the default should be :cumulative, see issue #1555
boundaries: DEFAULT_BOUNDARIES,
record_min_max: true
record_min_max: true,
exemplar_reservoir: DEFAULT_RESERVOIR
)

@exemplar_reservoir = exemplar_reservoir
@aggregation_temporality = aggregation_temporality.to_sym
@boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil
@record_min_max = record_min_max
Expand Down Expand Up @@ -68,7 +73,7 @@ def update(amount, attributes, data_points)
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
@exemplar_reservoir.collect(attributes: attributes, aggregation_temporality: @aggregation_temporality), # :exemplars
min, # :min
max # :max
)
Expand Down
13 changes: 10 additions & 3 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
# Contains the implementation of the Sum aggregation
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#sum-aggregation
class Sum
# if no reservior pass from instrument, then use this empty reservior to avoid no method found error
DEFAULT_RESERVOIR = Metrics::Exemplar::FixedSizeExemplarReservoir.new
private_constant :DEFAULT_RESERVOIR

attr_reader :aggregation_temporality

def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta))
def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta),
exemplar_reservoir: DEFAULT_RESERVOIR)
# TODO: the default should be :cumulative, see issue #1555
@exemplar_reservoir = exemplar_reservoir
@aggregation_temporality = aggregation_temporality.to_sym
end

Expand All @@ -39,12 +44,14 @@ def collect(start_time, end_time, data_points)
end

def update(increment, attributes, data_points)
# NumberDataPoint should include exemplars
ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
0,
nil
# will this cause the reservoir overloaded with old exemplars?
@exemplar_reservoir.collect(attributes: attributes, aggregation_temporality: @aggregation_temporality) # exemplar
)

ndp.value += increment
Expand Down
26 changes: 26 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/exemplar.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
# The Exemplar module contains the OpenTelemetry metrics reference
# exemplar implementations.
module Exemplar
end
end
end
end

require 'opentelemetry/sdk/metrics/exemplar/exemplar'
require 'opentelemetry/sdk/metrics/exemplar/exemplar_filter'
require 'opentelemetry/sdk/metrics/exemplar/exemplar_reservoir'
require 'opentelemetry/sdk/metrics/exemplar/always_off_exemplar_filter'
require 'opentelemetry/sdk/metrics/exemplar/always_on_exemplar_filter'
require 'opentelemetry/sdk/metrics/exemplar/trace_based_exemplar_filter'
require 'opentelemetry/sdk/metrics/exemplar/noop_exemplar_reservoir'
require 'opentelemetry/sdk/metrics/exemplar/fixed_size_exemplar_reservoir'
require 'opentelemetry/sdk/metrics/exemplar/histogram_exemplar_reservoir'
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Exemplar
# AlwaysOffExemplarFilter makes no measurements eligible for being an Exemplar.
# Using this ExemplarFilter is as good as disabling Exemplar feature.
class AlwaysOffExemplarFilter < ExemplarFilter
def self.should_sample?(value, timestamp, attributes, context)
false
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Exemplar
# AlwaysOnExemplarFilter
class AlwaysOnExemplarFilter < ExemplarFilter
def self.should_sample?(value, timestamp, attributes, context)
true
end
end
end
end
end
end
26 changes: 26 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/exemplar/exemplar.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Exemplar
# Exemplar
class Exemplar
attr_reader :value, :time_unix_nano, :attributes, :span_id, :trace_id

def initialize(value, time_unix_nano, attributes, span_id, trace_id)
@value = value
@time_unix_nano = time_unix_nano
@attributes = attributes
@span_id = span_id
@trace_id = trace_id
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Exemplar
# ExemplarFilter
class ExemplarFilter
# Returns a {Boolean} value.
#
# @param [Integer] value Value of the measurement
# @param [Hash] attributes Complete set of Attributes of the measurement
# @param [Context] context Context of the measurement, which covers the Baggage and the current active Span.
#
# @return [Boolean]
def self.should_sample?(value, attributes, context); end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Exemplar
# ExemplarReservoir
class ExemplarReservoir
def initialize
@exemplars = []
end

# Store the info into exemplars bucket
#
# @param [Integer] value Value of the measurement
# @param [Integer] timestamp Time of recording
# @param [Hash] attributes Complete set of Attributes of the measurement
# @param [Context] context SpanContext of the measurement, which covers the Baggage and the current active Span.
#
# @return [Nil]
def offer(value: nil, timestamp: nil, attributes: nil, context: nil)
span_context = current_span_context(context)
@exemplars << Exemplar.new(value, timestamp, attributes, span_context.hex_span_id, span_context.hex_trace_id)
nil
end

# return list of Exemplars based on given attributes
#
# @param [Hash] attributes Value of the measurement
# @param [Boolean] aggregation_temporality Should remove the original exemplars or not, default delta
#
# @return [Array] exemplars Array of exemplars
def collect(attributes: nil, aggregation_temporality: :delta)
exemplars = []
@exemplars.each { |exemplar| exemplars << exemplar if exemplar } # TODO: Addition operation on selecting exemplar
@exemplars.clear if aggregation_temporality == :delta
exemplars
end

def current_span_context(context)
::OpenTelemetry::Trace.current_span(context).context
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Exemplar
# FixedSizeExemplarReservoir
class FixedSizeExemplarReservoir < ExemplarReservoir
MAX_BUCKET_SIZE = 1

def initialize(max_size: nil)
super()
@max_size = max_size || MAX_BUCKET_SIZE
end

# MUST use an uniformly-weighted sampling algorithm based on the number of samples the reservoir
def offer(value: nil, timestamp: nil, attributes: nil, context: nil)
span_context = current_span_context(context)
if @exemplars.size >= @max_size
rand_index = rand(0..@max_size - 1)
@exemplars[rand_index] = Exemplar.new(value, timestamp, attributes, span_context.hex_span_id, span_context.hex_trace_id)
nil
else
super(value: value, timestamp: timestamp, attributes: attributes, context: context)
end
end

def collect(attributes: nil, aggregation_temporality: nil)
super(attributes: attributes, aggregation_temporality: aggregation_temporality)
end
end
end
end
end
end
Loading
Loading