Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lib/splitclient-rb/cache/fetchers/segment_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,22 @@ def call
end
end

def fetch_segments_if_not_exists(names)
names.each do |name|
change_number = @segments_repository.get_change_number(name)

fetch_segment(name) if change_number == -1
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def fetch_segment(name)
@semaphore.synchronize do
segments_api.fetch_segments_by_names([name])
true
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
false
end

def fetch_segments
Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb/cache/fetchers/split_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def fetch_splits
@config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled

@sdk_blocker.splits_ready!
true

data[:segment_names]
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
false
end

def stop_splits_thread
Expand Down
17 changes: 4 additions & 13 deletions lib/splitclient-rb/engine/synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def initialize(
def sync_all
@config.threads[:sync_all_thread] = Thread.new do
@config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled
fetch_splits
@split_fetcher.fetch_splits
fetch_segments
end
end
Expand All @@ -53,21 +53,12 @@ def stop_periodic_fetch
end

def fetch_splits
back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1)
loop do
break if @split_fetcher.fetch_splits

sleep(back_off.interval)
end
segment_names = @split_fetcher.fetch_splits
@segment_fetcher.fetch_segments_if_not_exists(segment_names) unless segment_names.empty?
end

def fetch_segment(name)
back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1)
loop do
break if @segment_fetcher.fetch_segment(name)

sleep(back_off.interval)
end
@segment_fetcher.fetch_segment(name)
end

private
Expand Down
9 changes: 5 additions & 4 deletions lib/splitclient-rb/sse/workers/segments_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ def stop
def perform
while (item = @queue.pop)
segment_name = item[:segment_name]
change_number = item[:change_number]
since = @segments_repository.get_change_number(segment_name)
cn = item[:change_number]
@config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}")

unless since >= change_number
@config.logger.debug("SegmentsWorker fetch_segment with #{since}")
attempt = 0
while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED
@synchronizer.fetch_segment(segment_name)
attempt += 1
end
end
end
Expand Down
9 changes: 6 additions & 3 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module SplitIoClient
module SSE
module Workers
MAX_RETRIES_ALLOWED = 10

class SplitsWorker
def initialize(synchronizer, config, splits_repository)
@synchronizer = synchronizer
Expand Down Expand Up @@ -59,11 +61,12 @@ def kill_split(change_number, split_name, default_treatment)

def perform
while (change_number = @queue.pop)
since = @splits_repository.get_change_number
@config.logger.debug("SplitsWorker change_number dequeue #{change_number}")

unless since.to_i >= change_number
@config.logger.debug("SplitsWorker fetch_splits with #{since}")
attempt = 0
while change_number > @splits_repository.get_change_number.to_i && attempt <= Workers::MAX_RETRIES_ALLOWED
@synchronizer.fetch_splits
attempt += 1
end
end
end
Expand Down
22 changes: 5 additions & 17 deletions spec/engine/synchronizer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,34 +87,22 @@

it 'fetch_splits' do
mock_split_changes(splits)
mock_segment_changes('segment2', segment2, '-1')
mock_segment_changes('segment2', segment2, '1470947453878')
mock_segment_changes('segment3', segment3, '-1')
mock_segment_changes('segment1', segment1, '-1')
mock_segment_changes('segment1', segment1, '1470947453877')

synchronizer.fetch_splits
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
end

it 'fetch_splits with retries' do
stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')
.to_return({ status: 500 }, { status: 200, body: splits })

synchronizer.fetch_splits

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.times(2)
end

it 'fetch_segment' do
mock_segment_changes('segment3', segment3, '-1')

synchronizer.fetch_segment('segment3')
expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.once
end

it 'fetch_segment with retries' do
stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')
.to_return({ status: 500 }, { status: 200, body: segment3 })

synchronizer.fetch_segment('segment3')
expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.times(2)
end
end

private
Expand Down
4 changes: 2 additions & 2 deletions spec/sse/sse_handler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@
end

context 'SEGMENT UPDATE event' do
it 'must trigger fetch' do
it 'must trigger fetch - with retries' do
mock_server do |server|
server.setup_response('/') do |_, res|
send_content(res, event_segment_update_must_fetch)
Expand All @@ -203,7 +203,7 @@

expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED)
expect(sse_handler.sse_client.connected?).to eq(true)
expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(1)
expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12)

sse_handler.sse_client.close

Expand Down
21 changes: 21 additions & 0 deletions spec/sse/workers/segments_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@

context 'add segment name to queue' do
it 'must trigger fetch' do
stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')
.to_return(status: 200, body:
'{
"name": "segment1",
"added": [],
"removed": [],
"since": 1470947453878,
"till": 1470947453878
}')

worker = subject.new(synchronizer, config, segments_repository)

worker.start
Expand All @@ -51,6 +61,17 @@
expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(2)
end

it 'must trigger fetch - with retries' do
worker = subject.new(synchronizer, config, segments_repository)

worker.start
worker.add_to_queue(1_506_703_262_918, 'segment1')

sleep(1)

expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12)
end

it 'must not trigger fetch' do
worker = subject.new(synchronizer, config, segments_repository)

Expand Down
55 changes: 46 additions & 9 deletions spec/sse/workers/splits_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,44 @@
let(:params) { { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: impression_counter } }
let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, sdk_blocker, params) }

before do
mock_split_changes(splits)
mock_segment_changes('segment1', segment1, '-1')
mock_segment_changes('segment1', segment1, '1470947453877')
mock_segment_changes('segment2', segment2, '-1')
mock_segment_changes('segment2', segment2, '1470947453878')
mock_segment_changes('segment3', segment3, '-1')

split_fetcher.fetch_splits
it 'add change number - must tigger fetcch - with retries' do
stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')
.to_return(status: 200, body:
'{
"splits": [],
"since": -1,
"till": 1506703262918
}')

stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')
.to_return(status: 200, body:
'{
"splits": [],
"since": 1506703262918,
"till": 1506703262918
}')

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_919)

sleep(1)

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(10)
end

context 'add change number to queue' do
before do
mock_split_changes(splits)
mock_segment_changes('segment1', segment1, '-1')
mock_segment_changes('segment1', segment1, '1470947453877')
mock_segment_changes('segment2', segment2, '-1')
mock_segment_changes('segment2', segment2, '1470947453878')
mock_segment_changes('segment3', segment3, '-1')

split_fetcher.fetch_splits
end

it 'must trigger fetch' do
worker = subject.new(synchronizer, config, splits_repository)
worker.start
Expand Down Expand Up @@ -71,6 +97,17 @@
end

context 'kill split notification' do
before do
mock_split_changes(splits)
mock_segment_changes('segment1', segment1, '-1')
mock_segment_changes('segment1', segment1, '1470947453877')
mock_segment_changes('segment2', segment2, '-1')
mock_segment_changes('segment2', segment2, '1470947453878')
mock_segment_changes('segment3', segment3, '-1')

split_fetcher.fetch_splits
end

it 'must kill split and trigger fetch' do
worker = subject.new(synchronizer, config, splits_repository)

Expand Down