diff --git a/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb b/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb index 6cd7b2952..19dab74ae 100644 --- a/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb +++ b/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb @@ -27,14 +27,22 @@ def call end end + def fetch_segments_if_not_exists(names) + names.each do |name| + change_number = @segments_repository.get_change_number(name) + + fetch_segment(name) if change_number == -1 + end + rescue StandardError => error + @config.log_found_exception(__method__.to_s, error) + end + def fetch_segment(name) @semaphore.synchronize do segments_api.fetch_segments_by_names([name]) - true end rescue StandardError => error @config.log_found_exception(__method__.to_s, error) - false end def fetch_segments diff --git a/lib/splitclient-rb/cache/fetchers/split_fetcher.rb b/lib/splitclient-rb/cache/fetchers/split_fetcher.rb index 104a839c8..8510ae438 100644 --- a/lib/splitclient-rb/cache/fetchers/split_fetcher.rb +++ b/lib/splitclient-rb/cache/fetchers/split_fetcher.rb @@ -41,11 +41,11 @@ def fetch_splits @config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled @sdk_blocker.splits_ready! - true + + data[:segment_names] end rescue StandardError => error @config.log_found_exception(__method__.to_s, error) - false end def stop_splits_thread diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index baf4f9b76..b9ea30e6e 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -30,7 +30,7 @@ def initialize( def sync_all @config.threads[:sync_all_thread] = Thread.new do @config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled - fetch_splits + @split_fetcher.fetch_splits fetch_segments end end @@ -53,21 +53,12 @@ def stop_periodic_fetch end def fetch_splits - back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1) - loop do - break if @split_fetcher.fetch_splits - - sleep(back_off.interval) - end + segment_names = @split_fetcher.fetch_splits + @segment_fetcher.fetch_segments_if_not_exists(segment_names) unless segment_names.empty? end def fetch_segment(name) - back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1) - loop do - break if @segment_fetcher.fetch_segment(name) - - sleep(back_off.interval) - end + @segment_fetcher.fetch_segment(name) end private diff --git a/lib/splitclient-rb/sse/workers/segments_worker.rb b/lib/splitclient-rb/sse/workers/segments_worker.rb index 3b364c7b0..6d5768e85 100644 --- a/lib/splitclient-rb/sse/workers/segments_worker.rb +++ b/lib/splitclient-rb/sse/workers/segments_worker.rb @@ -48,12 +48,13 @@ def stop def perform while (item = @queue.pop) segment_name = item[:segment_name] - change_number = item[:change_number] - since = @segments_repository.get_change_number(segment_name) + cn = item[:change_number] + @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") - unless since >= change_number - @config.logger.debug("SegmentsWorker fetch_segment with #{since}") + attempt = 0 + while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_segment(segment_name) + attempt += 1 end end end diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index a347c8ff7..ba4fd5273 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -3,6 +3,8 @@ module SplitIoClient module SSE module Workers + MAX_RETRIES_ALLOWED = 10 + class SplitsWorker def initialize(synchronizer, config, splits_repository) @synchronizer = synchronizer @@ -59,11 +61,12 @@ def kill_split(change_number, split_name, default_treatment) def perform while (change_number = @queue.pop) - since = @splits_repository.get_change_number + @config.logger.debug("SplitsWorker change_number dequeue #{change_number}") - unless since.to_i >= change_number - @config.logger.debug("SplitsWorker fetch_splits with #{since}") + attempt = 0 + while change_number > @splits_repository.get_change_number.to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_splits + attempt += 1 end end end diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index 0958d77bb..c3354e308 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -87,34 +87,22 @@ it 'fetch_splits' do mock_split_changes(splits) + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') synchronizer.fetch_splits expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once end - it 'fetch_splits with retries' do - stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') - .to_return({ status: 500 }, { status: 200, body: splits }) - - synchronizer.fetch_splits - - expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.times(2) - end - it 'fetch_segment' do mock_segment_changes('segment3', segment3, '-1') synchronizer.fetch_segment('segment3') expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.once end - - it 'fetch_segment with retries' do - stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1') - .to_return({ status: 500 }, { status: 200, body: segment3 }) - - synchronizer.fetch_segment('segment3') - expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.times(2) - end end private diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 798fa7918..75ab80f70 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -183,7 +183,7 @@ end context 'SEGMENT UPDATE event' do - it 'must trigger fetch' do + it 'must trigger fetch - with retries' do mock_server do |server| server.setup_response('/') do |_, res| send_content(res, event_segment_update_must_fetch) @@ -203,7 +203,7 @@ expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) - expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(1) + expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12) sse_handler.sse_client.close diff --git a/spec/sse/workers/segments_worker_spec.rb b/spec/sse/workers/segments_worker_spec.rb index 85aaecd24..131d731df 100644 --- a/spec/sse/workers/segments_worker_spec.rb +++ b/spec/sse/workers/segments_worker_spec.rb @@ -41,6 +41,16 @@ context 'add segment name to queue' do it 'must trigger fetch' do + stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877') + .to_return(status: 200, body: + '{ + "name": "segment1", + "added": [], + "removed": [], + "since": 1470947453878, + "till": 1470947453878 + }') + worker = subject.new(synchronizer, config, segments_repository) worker.start @@ -51,6 +61,17 @@ expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(2) end + it 'must trigger fetch - with retries' do + worker = subject.new(synchronizer, config, segments_repository) + + worker.start + worker.add_to_queue(1_506_703_262_918, 'segment1') + + sleep(1) + + expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12) + end + it 'must not trigger fetch' do worker = subject.new(synchronizer, config, segments_repository) diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 2ecfddeaf..2bc4fb397 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -27,18 +27,44 @@ let(:params) { { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: impression_counter } } let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, sdk_blocker, params) } - before do - mock_split_changes(splits) - mock_segment_changes('segment1', segment1, '-1') - mock_segment_changes('segment1', segment1, '1470947453877') - mock_segment_changes('segment2', segment2, '-1') - mock_segment_changes('segment2', segment2, '1470947453878') - mock_segment_changes('segment3', segment3, '-1') - - split_fetcher.fetch_splits + it 'add change number - must tigger fetcch - with retries' do + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') + .to_return(status: 200, body: + '{ + "splits": [], + "since": -1, + "till": 1506703262918 + }') + + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918') + .to_return(status: 200, body: + '{ + "splits": [], + "since": 1506703262918, + "till": 1506703262918 + }') + + worker = subject.new(synchronizer, config, splits_repository) + worker.start + worker.add_to_queue(1_506_703_262_919) + + sleep(1) + + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(10) end context 'add change number to queue' do + before do + mock_split_changes(splits) + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + + split_fetcher.fetch_splits + end + it 'must trigger fetch' do worker = subject.new(synchronizer, config, splits_repository) worker.start @@ -71,6 +97,17 @@ end context 'kill split notification' do + before do + mock_split_changes(splits) + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + + split_fetcher.fetch_splits + end + it 'must kill split and trigger fetch' do worker = subject.new(synchronizer, config, splits_repository)