Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic periodic exporting metric_reader #1603

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ module Export
require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/periodic_metric_reader'
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
# PeriodicMetricReader provides a minimal example implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what else should be added to take this beyond "a minimal example implementation"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think more advanced scheduling mechanisms like what Java has (e.g. queue system); or more strict thread like batch_span_processor you pointed out.

thread = lock do 
     @keep_running = false 
     @condition.signal 
     @thread 
   end 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that makes sense. Would you be open to creating an issue for the advanced scheduling mechanisms and any further changes you'd see to make this more similar to the batch_span_processor?

Copy link
Contributor Author

@xuan-cao-swi xuan-cao-swi Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do. Issue logged here

class PeriodicMetricReader < MetricReader
def initialize(interval_millis: ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60),
timeout_millis: ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30),
exporter: nil)
super()

@interval_millis = interval_millis

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does _millis mean? I thought "milliseconds" but after experimenting with this, it seems to be seconds, not milliseconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the name is indeed confusing. It should be seconds. I have updated the name to export_interval and export_timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks! :)

@timeout_millis = timeout_millis
@exporter = exporter
@thread = nil
@continue = false

start
end

def start
@continue = true
if @exporter.nil?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please close it if it needs to restart.'
else
@thread = Thread.new do
while @continue
sleep(@interval_millis)
begin
Timeout.timeout(@timeout_millis) { @exporter.export(collect) }
rescue Timeout::Error => e
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') evaluating to FAILURE ? if not then we should return FAILURE explicitly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think metric_reader shouldn't return anything (similar to metric_reader); only exporter will return either fail or success from export

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, you are absolutely right.

end
end
end
end
end

def close
@continue = false # force termination in next iteration
@thread.join(@interval_millis) # wait 5 seconds for collecting and exporting
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.')
end

def shutdown(timeout: nil)
close
@exporter.force_flush if @exporter.respond_to?(:force_flush)
@exporter.shutdown
Export::SUCCESS
rescue StandardError
Export::FAILURE
end

def force_flush(timeout: nil)
@exporter.export(collect)
Export::SUCCESS
rescue StandardError
Export::FAILURE
end
end
end
end
end
end
85 changes: 85 additions & 0 deletions metrics_sdk/test/integration/periodic_metric_reader_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK do
describe '#periodic_metric_reader' do
before { reset_metrics_sdk }

it 'emits 2 metrics after 10 seconds' do
OpenTelemetry::SDK.configure
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timeout_millis: 5, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're interested in an alternative to calling sleep, New Relic uses these methods to advance, freeze, and unfreeze time in our tests. It allows us to get the benefits of advancing time without slowing down the tests. We mostly use the Process-related methods now.

Incorporating timecop could be another option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example, and I have tried both new relic method and the timecop.
Not sure If I setup them correctly (with timecop/the code block), it didn't change the test time.

P.S. I think it would be great to have New Relic uses these methods to advance, freeze, and unfreeze time in our tests in test_helpers gem


periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(2)

first_snapshot = snapshot[0]
_(first_snapshot[0].name).must_equal('counter')
_(first_snapshot[0].unit).must_equal('smidgen')
_(first_snapshot[0].description).must_equal('a small amount of something')

_(first_snapshot[0].instrumentation_scope.name).must_equal('test')

_(first_snapshot[0].data_points[0].value).must_equal(1)
_(first_snapshot[0].data_points[0].attributes).must_equal({})

_(first_snapshot[0].data_points[1].value).must_equal(4)
_(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b')

_(first_snapshot[0].data_points[2].value).must_equal(3)
_(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c')

_(first_snapshot[0].data_points[3].value).must_equal(4)
_(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e')

_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end

it 'emits 1 metric after 1 second when interval is > 1 second' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timeout_millis: 5, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(1)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(1)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end
end
end
Loading