Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic periodic exporting metric_reader #1603

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ module Export
require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/periodic_metric_reader'
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
# PeriodicMetricReader provides a minimal example implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what else should be added to take this beyond "a minimal example implementation"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think more advanced scheduling mechanisms like what Java has (e.g. queue system); or more strict thread like batch_span_processor you pointed out.

thread = lock do 
     @keep_running = false 
     @condition.signal 
     @thread 
   end 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that makes sense. Would you be open to creating an issue for the advanced scheduling mechanisms and any further changes you'd see to make this more similar to the batch_span_processor?

Copy link
Contributor Author

@xuan-cao-swi xuan-cao-swi Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do. Issue logged here

class PeriodicMetricReader < MetricReader
def initialize(interval_millis: 60, timout_millis: 30, exporter: nil)
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
super()

@interval_millis = interval_millis

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does _millis mean? I thought "milliseconds" but after experimenting with this, it seems to be seconds, not milliseconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the name is indeed confusing. It should be seconds. I have updated the name to export_interval and export_timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks! :)

@timout_millis = timout_millis
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
@exporter = exporter
@thread = nil
@continue = false

start
end

def start
@continue = true
if @exporter.nil?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please close it if it needs to restart.'
else
@thread = Thread.new do
while @continue
sleep(@interval_millis)
begin
Timeout.timeout(@timout_millis) { @exporter.export(collect) }
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
rescue Timeout::Error => e
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') evaluating to FAILURE ? if not then we should return FAILURE explicitly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think metric_reader shouldn't return anything (similar to metric_reader); only exporter will return either fail or success from export

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, you are absolutely right.

end
end
end
end
end

def close
@continue = false # force termination in next iteration
@thread.join(5) # wait 5 seconds for collecting and exporting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5 seconds? Should it be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I take reference on what how Java handle the thread waiting (here). I don't see spec mention about the thread waiting time but it seems like it should be configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the source, Xuan. Does it make sense for the close method to use the @interval_millis value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like using @interval_millis in join as well. Updated.

rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.')
end

# TODO: determine correctness: directly kill the reader without waiting for next metrics collection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find any clues in the spec. Looking at our SDK, the BatchSpanProcessor also has some periodic functionality and calls force_flush during shutdown, skipping the periodic wait.

Maybe that's the right call here too?

# Shuts the consumer thread down and flushes the current accumulated buffer
# will block until the thread is finished.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
# non-specific failure occurred, TIMEOUT if a timeout occurred.
def shutdown(timeout: nil)
start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
thread = lock do
@keep_running = false
@condition.signal
@thread
end
thread&.join(timeout)
force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
dropped_spans = lock { spans.shift(spans.length) }
report_dropped_spans(dropped_spans, reason: 'terminating') if dropped_spans.any?
@exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Updated.

def shutdown(timeout: nil)
close
@exporter.shutdown
Export::SUCCESS
rescue StandardError
Export::FAILURE
end

def force_flush(timeout: nil)
@exporter.export(collect)
Export::SUCCESS
rescue StandardError
Export::FAILURE
end
end
end
end
end
end
86 changes: 86 additions & 0 deletions metrics_sdk/test/integration/periodic_metric_reader_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK do
describe '#periodic_metric_reader' do
before { reset_metrics_sdk }

it 'emits 2 metrics after 10 seconds' do
OpenTelemetry::SDK.configure
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timout_millis: 5, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're interested in an alternative to calling sleep, New Relic uses these methods to advance, freeze, and unfreeze time in our tests. It allows us to get the benefits of advancing time without slowing down the tests. We mostly use the Process-related methods now.

Incorporating timecop could be another option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example, and I have tried both new relic method and the timecop.
Not sure If I setup them correctly (with timecop/the code block), it didn't change the test time.

P.S. I think it would be great to have New Relic uses these methods to advance, freeze, and unfreeze time in our tests in test_helpers gem


periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(2)

first_snapshot = snapshot[0]
_(first_snapshot[0].name).must_equal('counter')
_(first_snapshot[0].unit).must_equal('smidgen')
_(first_snapshot[0].description).must_equal('a small amount of something')

_(first_snapshot[0].instrumentation_scope.name).must_equal('test')

_(first_snapshot[0].data_points[0].value).must_equal(1)
_(first_snapshot[0].data_points[0].attributes).must_equal({})

_(first_snapshot[0].data_points[1].value).must_equal(4)
_(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b')

_(first_snapshot[0].data_points[2].value).must_equal(3)
_(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c')

_(first_snapshot[0].data_points[3].value).must_equal(4)
_(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e')

puts periodic_metric_reader.instance_variable_get(:@thread)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end

it 'emits 1 metrics after 1 seconds when interval is > 1 seconds' do
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timout_millis: 5, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(1)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(1)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end
end
end
Loading