Skip to content

Commit

Permalink
feat: make thread properly close
Browse files Browse the repository at this point in the history
  • Loading branch information
xuan-cao-swi committed Feb 22, 2024
1 parent 3566cd2 commit cb84455
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,42 @@ def initialize(interval_millis: 60, timout_millis: 30, exporter: nil)
@interval_millis = interval_millis
@timout_millis = timout_millis
@exporter = exporter
@thread = nil
@thread = nil
@continue = false

start_reader
start
end

def start_reader
def start
@continue = true
if @exporter.nil?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif !@thread.nil?
OpenTelemetry.logger.warn 'PeriodicMetricReader is running. Please close it if need to restart.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please close it if it needs to restart.'
else
@thread = Thread.new { perodic_collect }
@thread = Thread.new do
while @continue
sleep(@interval_millis)
begin
Timeout.timeout(@timout_millis) { @exporter.export(collect) }
rescue Timeout::Error => e
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')
end
end
end
end
end

def perodic_collect
loop do
sleep(@interval_millis)
Timeout.timeout(@timout_millis) { @exporter.export(collect) }
end
end

def close_reader
@thread.kill
@thread.join
def close
@continue = false # force termination in next iteration
@thread.join(5) # wait 5 seconds for collecting and exporting
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.')
ensure
@thread = nil
end

# TODO: determine correctness: directly kill the reader without waiting for next metrics collection
def shutdown(timeout: nil)
close_reader
close
@exporter.shutdown
Export::SUCCESS
rescue StandardError
Expand Down
21 changes: 9 additions & 12 deletions metrics_sdk/test/integration/periodic_metric_reader_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
describe '#periodic_metric_reader' do
before { reset_metrics_sdk }

it 'emits 1 metrics after 10 seconds' do
it 'emits 2 metrics after 10 seconds' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
Expand All @@ -27,16 +27,14 @@
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

_(periodic_metric_reader.instance_variable_get(:@thread).class).must_equal Thread

sleep(8)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots
first_snapshot = snapshot[0]
snapshot = metric_exporter.metric_snapshots

_(snapshot).wont_be_empty
_(snapshot.size).must_equal(2)

first_snapshot = snapshot[0]
_(first_snapshot[0].name).must_equal('counter')
_(first_snapshot[0].unit).must_equal('smidgen')
_(first_snapshot[0].description).must_equal('a small amount of something')
Expand All @@ -55,10 +53,11 @@
_(first_snapshot[0].data_points[3].value).must_equal(4)
_(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e')

_(periodic_metric_reader.instance_variable_get(:@thread)).must_be_nil
puts periodic_metric_reader.instance_variable_get(:@thread)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end

it 'emits 0 metrics after 1 seconds when interval is > 1 seconds' do
it 'emits 1 metrics after 1 seconds when interval is > 1 seconds' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
Expand All @@ -75,15 +74,13 @@
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

_(periodic_metric_reader.instance_variable_get(:@thread).class).must_equal Thread

sleep(1)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(0)
_(periodic_metric_reader.instance_variable_get(:@thread)).must_be_nil
_(snapshot.size).must_equal(1)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end
end
end

0 comments on commit cb84455

Please sign in to comment.