Skip to content

feat: AWS X-Ray Remote Sampler Part 3 - Add Rate Limiter and Sampling Targets Poller Logic #1536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions sampler/xray/example/xray_sampling_on_rails_demonstration.ru
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'bundler/inline'

gemfile(true) do
source 'https://rubygems.org'

gem 'concurrent-ruby', '1.3.4'
gem 'rails', '~> 7.0.4'
gem 'puma'

gem 'opentelemetry-sdk'
gem 'opentelemetry-instrumentation-rails'
gem 'opentelemetry-sampler-xray', path: './../' # Use local version of the X-Ray Sampler
# gem 'opentelemetry-sampler-xray' # Use RubyGems version of the X-Ray Sampler
end

require "action_controller/railtie"
require "action_mailer/railtie"
require "rails/test_unit/railtie"

class App < Rails::Application
config.root = __dir__
config.consider_all_requests_local = true

routes.append do
root to: 'welcome#index'
get "/test" => 'welcome#test'
end
end

class WelcomeController < ActionController::Base
def index
render inline: 'Successfully called "/" endpoint'
end

def test
render inline: 'Successfully called "/test" endpoint'
end
end

ENV['OTEL_TRACES_EXPORTER'] = 'console'
ENV['OTEL_SERVICE_NAME'] = 'xray-sampler-on-rails-service'
OpenTelemetry::SDK.configure do |c|
c.use_all({ 'OpenTelemetry::Instrumentation::ActiveRecord' => { enabled: false } })
end

OpenTelemetry.tracer_provider.sampler = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource:OpenTelemetry::SDK::Resources::Resource.create({
"service.name"=>"xray-sampler-on-rails-service"
}))

App.initialize!

run App

#### Running and using the Sample App
# To run this example run the `rackup` command with this file
# Example: rackup trace_request_demonstration.ru
# Navigate to http://localhost:9292/
# Spans for any requests sampled by the X-Ray Sampler will appear in the console

#### Required configuration in the OpenTelemetry Collector
# In order for sampling rules to be obtained from AWS X-Ray, the awsproxy extension
# must be configured in the OpenTelemetry Collector, which will use your AWS credentials.
# - https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/awsproxy#aws-proxy
# Without the awsproxy extension, the X-Ray Sampler will use a fallback sampler
# with a sampling strategy of "1 request/second, plus 5% of any additional requests"

#### Testing out configurable X-Ray Sampling Rules against the "service.name" resource attribute.
# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces.
# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules
# Matching Criteria
# ServiceName = xray-sampler-on-rails-service
# ServiceType = *
# Host = *
# ResourceARN = *
# HTTPMethod = *
# URLPath = *
# For the above matching criteria, try out the following settings to sample or not sample requests
# - Limit to 0r/sec then 0 fixed rate
# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply)
# - Limit to 0r/sec then 100% fixed rate

#### Testing out configurable X-Ray Sampling Rules against the "/test" endpoint in this sample app.
# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces.
# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules
# Matching Criteria
# ServiceName = *
# ServiceType = *
# Host = *
# ResourceARN = *
# HTTPMethod = *
# URLPath = /test
# For the above matching criteria, try out the following settings to sample or not sample requests
# - Limit to 0r/sec then 0 fixed rate
# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply)
# - Limit to 0r/sec then 100% fixed rate
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
#
# SPDX-License-Identifier: Apache-2.0

require 'net/http'
require 'json'
require 'net/http'
require 'opentelemetry/sdk'
require_relative 'sampling_rule'
require_relative 'aws_xray_sampling_client'
require_relative 'fallback_sampler'
require_relative 'sampling_rule_applier'
require_relative 'rule_cache'
require_relative 'aws_xray_sampling_client'
require_relative 'sampling_rule'
require_relative 'sampling_rule_applier'

module OpenTelemetry
module Sampler
Expand Down Expand Up @@ -68,7 +68,8 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
# Start the Sampling Rules poller
start_sampling_rules_poller

# TODO: Start the Sampling Targets poller
# Start the Sampling Targets poller
start_sampling_targets_poller
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
Expand Down Expand Up @@ -113,6 +114,15 @@ def start_sampling_rules_poller
end
end

def start_sampling_targets_poller
@target_poller = Thread.new do
loop do
sleep(((@target_polling_interval * 1000) + @target_polling_jitter_millis) / 1000.0)
retrieve_and_update_sampling_targets
end
end
end

def retrieve_and_update_sampling_rules
sampling_rules_response = @sampling_client.fetch_sampling_rules
if sampling_rules_response&.body && sampling_rules_response.body != ''
Expand All @@ -125,6 +135,19 @@ def retrieve_and_update_sampling_rules
OpenTelemetry.handle_error(exception: e, message: 'Error occurred when retrieving or updating Sampling Rules')
end

def retrieve_and_update_sampling_targets
request_body = {
SamplingStatisticsDocuments: @rule_cache.create_sampling_statistics_documents(@client_id)
}
sampling_targets_response = @sampling_client.fetch_sampling_targets(request_body)
if sampling_targets_response&.body && sampling_targets_response.body != ''
response_body = JSON.parse(sampling_targets_response.body)
update_sampling_targets(response_body)
else
OpenTelemetry.logger.debug('SamplingTargets Response is falsy')
end
end

def update_sampling_rules(response_object)
sampling_rules = []
if response_object && response_object['SamplingRuleRecords']
Expand All @@ -140,6 +163,33 @@ def update_sampling_rules(response_object)
end
end

def update_sampling_targets(response_object)
if response_object && response_object['SamplingTargetDocuments']
target_documents = {}

response_object['SamplingTargetDocuments'].each do |new_target|
target_documents[new_target['RuleName']] = new_target
end

refresh_sampling_rules, next_polling_interval = @rule_cache.update_targets(
target_documents,
response_object['LastRuleModification']
)

@target_polling_interval = next_polling_interval

if refresh_sampling_rules
OpenTelemetry.logger.debug('Performing out-of-band sampling rule polling to fetch updated rules.')
@rule_poller&.kill
start_sampling_rules_poller
end
else
OpenTelemetry.logger.debug('SamplingTargetDocuments from SamplingTargets request is not defined')
end
rescue StandardError => e
OpenTelemetry.logger.debug("Error occurred when updating Sampling Targets: #{e}")
end

class << self
def generate_client_id
hex_chars = ('0'..'9').to_a + ('a'..'f').to_a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'rate_limiting_sampler'

module OpenTelemetry
module Sampler
module XRay
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
class FallbackSampler
def initialize
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
@rate_limiting_sampler = RateLimitingSampler.new(1)
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
# TODO: implement and use Rate Limiting Sampler
sampling_result = @rate_limiting_sampler.should_sample?(
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
)

return sampling_result if sampling_result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP

@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
end
Expand Down
48 changes: 48 additions & 0 deletions sampler/xray/lib/opentelemetry/sampler/xray/rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Sampler
module XRay
# RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
# If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
# A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
class RateLimiter
def initialize(quota, max_balance_in_seconds = 1)
@max_balance_millis = max_balance_in_seconds * 1000.0
@quota = quota
@wallet_floor_millis = Time.now.to_f * 1000
# current "balance" would be `ceiling - floor`
@lock = Mutex.new
end

def take(cost = 1)
return false if @quota.zero?

quota_per_millis = @quota / 1000.0

# assume divide by zero not possible
cost_in_millis = cost / quota_per_millis

@lock.synchronize do
wallet_ceiling_millis = Time.now.to_f * 1000
current_balance_millis = wallet_ceiling_millis - @wallet_floor_millis
current_balance_millis = [current_balance_millis, @max_balance_millis].min
pending_remaining_balance_millis = current_balance_millis - cost_in_millis

if pending_remaining_balance_millis >= 0
@wallet_floor_millis = wallet_ceiling_millis - pending_remaining_balance_millis
return true
end

# No changes to the wallet state
false
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'rate_limiter'

module OpenTelemetry
module Sampler
module XRay
# RateLimitingSampler is a Sampler that uses a RateLimiter to determine
# if it should sample or not based on the quota balance available.
class RateLimitingSampler
def initialize(quota)
@quota = quota
@reservoir = RateLimiter.new(quota)
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
tracestate = OpenTelemetry::Trace.current_span(parent_context).context.tracestate
if @reservoir.take(1)
OpenTelemetry::SDK::Trace::Samplers::Result.new(
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE,
tracestate: tracestate,
attributes: attributes
)
else
OpenTelemetry::SDK::Trace::Samplers::Result.new(
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
tracestate: tracestate,
attributes: attributes
)
end
end

def to_s
"RateLimitingSampler{rate limiting sampling with sampling config of #{@quota} req/sec and 0% of additional requests}"
end
end
end
end
end
46 changes: 46 additions & 0 deletions sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,52 @@ def update_rules(new_rule_appliers)
end
end

def create_sampling_statistics_documents(client_id)
statistics_documents = []

@cache_lock.synchronize do
@rule_appliers.each do |rule|
statistics = rule.snapshot_statistics
now_in_seconds = Time.now.to_i

sampling_statistics_doc = {
ClientID: client_id,
RuleName: rule.sampling_rule.rule_name,
Timestamp: now_in_seconds,
RequestCount: statistics.request_count,
BorrowCount: statistics.borrow_count,
SampledCount: statistics.sample_count
}

statistics_documents << sampling_statistics_doc
end
end

statistics_documents
end

def update_targets(target_documents, last_rule_modification)
min_polling_interval = nil
next_polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS

@cache_lock.synchronize do
@rule_appliers.each_with_index do |rule, index|
target = target_documents[rule.sampling_rule.rule_name]
if target
@rule_appliers[index] = rule.with_target(target)
min_polling_interval = target['Interval'] if target['Interval'] && (min_polling_interval.nil? || min_polling_interval > target['Interval'])
else
OpenTelemetry.logger.debug('Invalid sampling target: missing rule name')
end
end

next_polling_interval = min_polling_interval if min_polling_interval

refresh_sampling_rules = last_rule_modification * 1000 > @last_updated_epoch_millis
return [refresh_sampling_rules, next_polling_interval]
end
end

private

def sort_rules_by_priority
Expand Down
Loading