From 9f2b8b0c6778bd279fd9fe5b9875b9dd3ae086c7 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Tue, 20 Jul 2021 10:42:37 -0300 Subject: [PATCH 1/2] splits logic implementation --- lib/splitclient-rb/engine/synchronizer.rb | 60 ++++++++++++++++++- .../sse/workers/splits_worker.rb | 9 +-- 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index 5f23b4cb..7508de3b 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -6,6 +6,10 @@ class Synchronizer include SplitIoClient::Cache::Fetchers include SplitIoClient::Cache::Senders + ON_DEMAND_FETCH_BACKOFF_BASE_MS = 10_000 + ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS = 60_000 + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 + def initialize( repositories, api_key, @@ -25,6 +29,7 @@ def initialize( @impressions_api = SplitIoClient::Api::Impressions.new(@api_key, @config, params[:telemetry_runtime_producer]) @impression_counter = params[:imp_counter] @telemetry_synchronizer = params[:telemetry_synchronizer] + @backoff = new SSE::EventSource::BackOff.new() end def sync_all @@ -52,10 +57,40 @@ def stop_periodic_fetch @segment_fetcher.stop_segments_thread end - def fetch_splits + def fetch_splits(target_change_number) + return if target_change_number <= @splits_repository.get_change_number + fetch_options = { cache_control_headers: true, till: nil } - segment_names = @split_fetcher.fetch_splits(fetch_options) - @segment_fetcher.fetch_segments_if_not_exists(segment_names, true) unless segment_names.empty? + + result = attempt_splits_sync(target_change_number, + fetch_options, + @config.on_demand_fetch_max_retries, + @config.on_demand_fetch_retry_delay_ms) + + attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts] + if result[:success] + @segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty? + @config.logger.debug("Refresh completed in #{attempts} attempts.") if @config.debug_enabled + + return + end + + fetch_options[:till] = target_change_number + result = attempt_splits_sync(target_change_number, + fetch_options, + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES, + @config.on_demand_fetch_retry_delay_ms) + + attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts] + + if result[:success] + @segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty? + @config.logger.debug("Refresh completed bypassing the CDN in #{attempts} attempts.") + else + @config.logger.debug("No changes fetched after #{attempts} attempts with CDN bypassed.") + end + rescue StandardError => error + @config.log_found_exception(__method__.to_s, error) end def fetch_segment(name) @@ -65,6 +100,21 @@ def fetch_segment(name) private + def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_ms) + remaining_attempts = max_retries + + loop do + remaining_attempts -= 1 + + segment_names = @split_fetcher.fetch_splits(fetch_options) + + return split_sync_result(true, remaining_attempts, segment_names) if target_cn <= @splits_repository.get_change_number + return split_sync_result(false, remaining_attempts, segment_names) if remaining_attempts <= 0 + + sleep(retry_delay_ms) + end + end + def fetch_segments @segment_fetcher.fetch_segments end @@ -87,6 +137,10 @@ def impressions_count_sender def start_telemetry_sync_task Telemetry::SyncTask.new(@config, @telemetry_synchronizer).call end + + def split_sync_result(success, remaining_attempts, segment_names) + { success: success, remaining_attempts: remaining_attempts, segment_names: segment_names } + end end end end diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index ba4fd527..a05f63bb 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -3,8 +3,6 @@ module SplitIoClient module SSE module Workers - MAX_RETRIES_ALLOWED = 10 - class SplitsWorker def initialize(synchronizer, config, splits_repository) @synchronizer = synchronizer @@ -62,12 +60,7 @@ def kill_split(change_number, split_name, default_treatment) def perform while (change_number = @queue.pop) @config.logger.debug("SplitsWorker change_number dequeue #{change_number}") - - attempt = 0 - while change_number > @splits_repository.get_change_number.to_i && attempt <= Workers::MAX_RETRIES_ALLOWED - @synchronizer.fetch_splits - attempt += 1 - end + @synchronizer.fetch_splits end end From 7df074a77c7d5b3d781346ed742e3e21ca4b3a60 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Tue, 20 Jul 2021 17:49:08 -0300 Subject: [PATCH 2/2] splits retries implementation --- .rubocop.yml | 2 ++ lib/splitclient-rb/engine/synchronizer.rb | 19 +++++----- lib/splitclient-rb/split_config.rb | 8 ++--- .../sse/event_source/back_off.rb | 7 ++-- .../sse/workers/segments_worker.rb | 3 +- .../sse/workers/splits_worker.rb | 2 +- spec/engine/synchronizer_spec.rb | 35 ++++++++++++++++++- spec/splitclient/split_config_spec.rb | 2 +- spec/sse/event_source/back_off_spec.rb | 34 ++++++++++++++++-- spec/sse/sse_handler_spec.rb | 2 +- spec/sse/workers/splits_worker_spec.rb | 3 +- 11 files changed, 95 insertions(+), 22 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 9806211d..74645582 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -31,6 +31,7 @@ Metrics/LineLength: - spec/engine/sync_manager_spec.rb - spec/engine/auth_api_client_spec.rb - spec/telemetry/synchronizer_spec.rb + - spec/splitclient/split_config_spec.rb Style/BracesAroundHashParameters: Exclude: @@ -62,3 +63,4 @@ AllCops: - lib/splitclient-rb/engine/models/**/* - lib/splitclient-rb/engine/parser/**/* - spec/telemetry/synchronizer_spec.rb + - lib/splitclient-rb/engine/synchronizer.rb diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index 7508de3b..ac4df696 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -6,8 +6,8 @@ class Synchronizer include SplitIoClient::Cache::Fetchers include SplitIoClient::Cache::Senders - ON_DEMAND_FETCH_BACKOFF_BASE_MS = 10_000 - ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS = 60_000 + ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS = 10 + ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS = 60 ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 def initialize( @@ -29,7 +29,6 @@ def initialize( @impressions_api = SplitIoClient::Api::Impressions.new(@api_key, @config, params[:telemetry_runtime_producer]) @impression_counter = params[:imp_counter] @telemetry_synchronizer = params[:telemetry_synchronizer] - @backoff = new SSE::EventSource::BackOff.new() end def sync_all @@ -58,14 +57,15 @@ def stop_periodic_fetch end def fetch_splits(target_change_number) - return if target_change_number <= @splits_repository.get_change_number + return if target_change_number <= @splits_repository.get_change_number.to_i fetch_options = { cache_control_headers: true, till: nil } result = attempt_splits_sync(target_change_number, fetch_options, @config.on_demand_fetch_max_retries, - @config.on_demand_fetch_retry_delay_ms) + @config.on_demand_fetch_retry_delay_seconds, + false) attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts] if result[:success] @@ -79,7 +79,8 @@ def fetch_splits(target_change_number) result = attempt_splits_sync(target_change_number, fetch_options, ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES, - @config.on_demand_fetch_retry_delay_ms) + nil, + true) attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts] @@ -100,8 +101,9 @@ def fetch_segment(name) private - def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_ms) + def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff) remaining_attempts = max_retries + backoff = SSE::EventSource::BackOff.new(ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS, 0, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS) if with_backoff loop do remaining_attempts -= 1 @@ -111,7 +113,8 @@ def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_ms) return split_sync_result(true, remaining_attempts, segment_names) if target_cn <= @splits_repository.get_change_number return split_sync_result(false, remaining_attempts, segment_names) if remaining_attempts <= 0 - sleep(retry_delay_ms) + delay = with_backoff ? backoff.interval : retry_delay_seconds + sleep(delay) end end diff --git a/lib/splitclient-rb/split_config.rb b/lib/splitclient-rb/split_config.rb index 7886e99d..837e8a68 100644 --- a/lib/splitclient-rb/split_config.rb +++ b/lib/splitclient-rb/split_config.rb @@ -113,7 +113,7 @@ def initialize(opts = {}) @sdk_start_time = Time.now - @on_demand_fetch_retry_delay_ms = SplitConfig.default_on_demand_fetch_retry_delay_ms + @on_demand_fetch_retry_delay_seconds = SplitConfig.default_on_demand_fetch_retry_delay_seconds @on_demand_fetch_max_retries = SplitConfig.default_on_demand_fetch_max_retries startup_log @@ -281,11 +281,11 @@ def initialize(opts = {}) attr_accessor :sdk_start_time - attr_accessor :on_demand_fetch_retry_delay_ms + attr_accessor :on_demand_fetch_retry_delay_seconds attr_accessor :on_demand_fetch_max_retries - def self.default_on_demand_fetch_retry_delay_ms - 50 + def self.default_on_demand_fetch_retry_delay_seconds + 0.05 end def self.default_on_demand_fetch_max_retries diff --git a/lib/splitclient-rb/sse/event_source/back_off.rb b/lib/splitclient-rb/sse/event_source/back_off.rb index b9a2f8c0..ea21fc85 100644 --- a/lib/splitclient-rb/sse/event_source/back_off.rb +++ b/lib/splitclient-rb/sse/event_source/back_off.rb @@ -3,17 +3,20 @@ module SplitIoClient module SSE module EventSource + BACKOFF_MAX_ALLOWED = 1.8 class BackOff - def initialize(back_off_base, attempt = 0) + def initialize(back_off_base, attempt = 0, max_allowed = BACKOFF_MAX_ALLOWED) @attempt = attempt @back_off_base = back_off_base + @max_allowed = max_allowed end def interval + interval = 0 interval = (@back_off_base * (2**@attempt)) if @attempt.positive? @attempt += 1 - interval || 0 + interval >= @max_allowed ? @max_allowed : interval end def reset diff --git a/lib/splitclient-rb/sse/workers/segments_worker.rb b/lib/splitclient-rb/sse/workers/segments_worker.rb index 6d5768e8..d078a356 100644 --- a/lib/splitclient-rb/sse/workers/segments_worker.rb +++ b/lib/splitclient-rb/sse/workers/segments_worker.rb @@ -3,6 +3,7 @@ module SplitIoClient module SSE module Workers + MAX_RETRIES_ALLOWED = 10 class SegmentsWorker def initialize(synchronizer, config, segments_repository) @synchronizer = synchronizer @@ -52,7 +53,7 @@ def perform @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") attempt = 0 - while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED + while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= MAX_RETRIES_ALLOWED @synchronizer.fetch_segment(segment_name) attempt += 1 end diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index a05f63bb..03c780b1 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -60,7 +60,7 @@ def kill_split(change_number, split_name, default_treatment) def perform while (change_number = @queue.pop) @config.logger.debug("SplitsWorker change_number dequeue #{change_number}") - @synchronizer.fetch_splits + @synchronizer.fetch_splits(change_number) end end diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index 2ad3acfb..f79ee117 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -92,10 +92,43 @@ mock_segment_changes('segment1', segment1, '-1') mock_segment_changes('segment1', segment1, '1470947453877') - synchronizer.fetch_splits + synchronizer.fetch_splits(0) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once end + it 'fetch_splits - ' do + sync = subject.new(repositories, api_key, config, sdk_blocker, parameters) + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') + .to_return(status: 200, body: + '{ + "splits": [], + "since": -1, + "till": 1506703262918 + }') + + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918') + .to_return(status: 200, body: + '{ + "splits": [], + "since": 1506703262918, + "till": 1506703262918 + }') + + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918&till=1506703262920') + .to_return(status: 200, body: + '{ + "splits": [], + "since": 1506703262918, + "till": 1506703262921 + }') + + sync.fetch_splits(1_506_703_262_920) + + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(9) + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918&till=1506703262920')).to have_been_made.once + end + it 'fetch_segment' do mock_segment_changes('segment3', segment3, '-1') diff --git a/spec/splitclient/split_config_spec.rb b/spec/splitclient/split_config_spec.rb index 97b675f2..8c5542fa 100644 --- a/spec/splitclient/split_config_spec.rb +++ b/spec/splitclient/split_config_spec.rb @@ -31,7 +31,7 @@ expect(configs.ip_addresses_enabled).to eq default_ip expect(configs.machine_name).to eq SplitIoClient::SplitConfig.machine_hostname(default_ip, nil, :redis) expect(configs.machine_ip).to eq SplitIoClient::SplitConfig.machine_ip(default_ip, nil, :redis) - expect(configs.on_demand_fetch_retry_delay_ms).to eq SplitIoClient::SplitConfig.default_on_demand_fetch_retry_delay_ms + expect(configs.on_demand_fetch_retry_delay_seconds).to eq SplitIoClient::SplitConfig.default_on_demand_fetch_retry_delay_seconds expect(configs.on_demand_fetch_max_retries).to eq SplitIoClient::SplitConfig.default_on_demand_fetch_max_retries end diff --git a/spec/sse/event_source/back_off_spec.rb b/spec/sse/event_source/back_off_spec.rb index 303a0ac6..3bac2590 100644 --- a/spec/sse/event_source/back_off_spec.rb +++ b/spec/sse/event_source/back_off_spec.rb @@ -9,7 +9,7 @@ let(:log) { StringIO.new } it 'get intervals and reset attemps' do - back_off = subject.new(1) + back_off = subject.new(1, 0, 5) firts_interval = back_off.interval expect(firts_interval).to eq(0) @@ -27,7 +27,7 @@ it 'with custom config' do streaming_reconnect_back_off_base = 5 - back_off = subject.new(streaming_reconnect_back_off_base) + back_off = subject.new(streaming_reconnect_back_off_base, 0, 30) firts_interval = back_off.interval expect(firts_interval).to eq(0) @@ -42,4 +42,34 @@ reset_interval = back_off.interval expect(reset_interval).to eq(0) end + + it 'with max' do + streaming_reconnect_back_off_base = 5 + back_off = subject.new(streaming_reconnect_back_off_base, 0, 30) + + firts_interval = back_off.interval + expect(firts_interval).to eq(0) + + second_interval = back_off.interval + expect(second_interval).to eq(10) + + third_interval = back_off.interval + expect(third_interval).to eq(20) + + third_interval = back_off.interval + expect(third_interval).to eq(30) + + third_interval = back_off.interval + expect(third_interval).to eq(30) + + third_interval = back_off.interval + expect(third_interval).to eq(30) + + third_interval = back_off.interval + expect(third_interval).to eq(30) + + back_off.reset + reset_interval = back_off.interval + expect(reset_interval).to eq(0) + end end diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 75dac878..41fd4a5d 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -62,7 +62,7 @@ mock_segment_changes('segment2', segment2, '1470947453878') mock_segment_changes('segment3', segment3, '-1') - synchronizer.fetch_splits + synchronizer.fetch_splits(0) end context 'SPLIT UPDATE event' do diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 3274fc41..bd499166 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -49,7 +49,8 @@ sleep(1) - expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(10) + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.times(1) + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.at_least_times(2) end context 'add change number to queue' do