Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Metrics/LineLength:
- spec/engine/sync_manager_spec.rb
- spec/engine/auth_api_client_spec.rb
- spec/telemetry/synchronizer_spec.rb
- spec/splitclient/split_config_spec.rb

Style/BracesAroundHashParameters:
Exclude:
Expand Down Expand Up @@ -62,3 +63,4 @@ AllCops:
- lib/splitclient-rb/engine/models/**/*
- lib/splitclient-rb/engine/parser/**/*
- spec/telemetry/synchronizer_spec.rb
- lib/splitclient-rb/engine/synchronizer.rb
63 changes: 60 additions & 3 deletions lib/splitclient-rb/engine/synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ class Synchronizer
include SplitIoClient::Cache::Fetchers
include SplitIoClient::Cache::Senders

ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS = 10
ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS = 60
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10

def initialize(
repositories,
api_key,
Expand Down Expand Up @@ -52,10 +56,42 @@ def stop_periodic_fetch
@segment_fetcher.stop_segments_thread
end

def fetch_splits
def fetch_splits(target_change_number)
return if target_change_number <= @splits_repository.get_change_number.to_i

fetch_options = { cache_control_headers: true, till: nil }
segment_names = @split_fetcher.fetch_splits(fetch_options)
@segment_fetcher.fetch_segments_if_not_exists(segment_names, true) unless segment_names.empty?

result = attempt_splits_sync(target_change_number,
fetch_options,
@config.on_demand_fetch_max_retries,
@config.on_demand_fetch_retry_delay_seconds,
false)

attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
if result[:success]
@segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
@config.logger.debug("Refresh completed in #{attempts} attempts.") if @config.debug_enabled

return
end

fetch_options[:till] = target_change_number
result = attempt_splits_sync(target_change_number,
fetch_options,
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
nil,
true)

attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]

if result[:success]
@segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
@config.logger.debug("Refresh completed bypassing the CDN in #{attempts} attempts.")
else
@config.logger.debug("No changes fetched after #{attempts} attempts with CDN bypassed.")
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def fetch_segment(name)
Expand All @@ -65,6 +101,23 @@ def fetch_segment(name)

private

def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
remaining_attempts = max_retries
backoff = SSE::EventSource::BackOff.new(ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS, 0, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS) if with_backoff

loop do
remaining_attempts -= 1

segment_names = @split_fetcher.fetch_splits(fetch_options)

return split_sync_result(true, remaining_attempts, segment_names) if target_cn <= @splits_repository.get_change_number
return split_sync_result(false, remaining_attempts, segment_names) if remaining_attempts <= 0

delay = with_backoff ? backoff.interval : retry_delay_seconds
sleep(delay)
end
end

def fetch_segments
@segment_fetcher.fetch_segments
end
Expand All @@ -87,6 +140,10 @@ def impressions_count_sender
def start_telemetry_sync_task
Telemetry::SyncTask.new(@config, @telemetry_synchronizer).call
end

def split_sync_result(success, remaining_attempts, segment_names)
{ success: success, remaining_attempts: remaining_attempts, segment_names: segment_names }
end
end
end
end
8 changes: 4 additions & 4 deletions lib/splitclient-rb/split_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def initialize(opts = {})

@sdk_start_time = Time.now

@on_demand_fetch_retry_delay_ms = SplitConfig.default_on_demand_fetch_retry_delay_ms
@on_demand_fetch_retry_delay_seconds = SplitConfig.default_on_demand_fetch_retry_delay_seconds
@on_demand_fetch_max_retries = SplitConfig.default_on_demand_fetch_max_retries

startup_log
Expand Down Expand Up @@ -281,11 +281,11 @@ def initialize(opts = {})

attr_accessor :sdk_start_time

attr_accessor :on_demand_fetch_retry_delay_ms
attr_accessor :on_demand_fetch_retry_delay_seconds
attr_accessor :on_demand_fetch_max_retries

def self.default_on_demand_fetch_retry_delay_ms
50
def self.default_on_demand_fetch_retry_delay_seconds
0.05
end

def self.default_on_demand_fetch_max_retries
Expand Down
7 changes: 5 additions & 2 deletions lib/splitclient-rb/sse/event_source/back_off.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
module SplitIoClient
module SSE
module EventSource
BACKOFF_MAX_ALLOWED = 1.8
class BackOff
def initialize(back_off_base, attempt = 0)
def initialize(back_off_base, attempt = 0, max_allowed = BACKOFF_MAX_ALLOWED)
@attempt = attempt
@back_off_base = back_off_base
@max_allowed = max_allowed
end

def interval
interval = 0
interval = (@back_off_base * (2**@attempt)) if @attempt.positive?
@attempt += 1

interval || 0
interval >= @max_allowed ? @max_allowed : interval
end

def reset
Expand Down
3 changes: 2 additions & 1 deletion lib/splitclient-rb/sse/workers/segments_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module SplitIoClient
module SSE
module Workers
MAX_RETRIES_ALLOWED = 10
class SegmentsWorker
def initialize(synchronizer, config, segments_repository)
@synchronizer = synchronizer
Expand Down Expand Up @@ -52,7 +53,7 @@ def perform
@config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}")

attempt = 0
while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED
while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= MAX_RETRIES_ALLOWED
@synchronizer.fetch_segment(segment_name)
attempt += 1
end
Expand Down
9 changes: 1 addition & 8 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
module SplitIoClient
module SSE
module Workers
MAX_RETRIES_ALLOWED = 10

class SplitsWorker
def initialize(synchronizer, config, splits_repository)
@synchronizer = synchronizer
Expand Down Expand Up @@ -62,12 +60,7 @@ def kill_split(change_number, split_name, default_treatment)
def perform
while (change_number = @queue.pop)
@config.logger.debug("SplitsWorker change_number dequeue #{change_number}")

attempt = 0
while change_number > @splits_repository.get_change_number.to_i && attempt <= Workers::MAX_RETRIES_ALLOWED
@synchronizer.fetch_splits
attempt += 1
end
@synchronizer.fetch_splits(change_number)
end
end

Expand Down
35 changes: 34 additions & 1 deletion spec/engine/synchronizer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,43 @@
mock_segment_changes('segment1', segment1, '-1')
mock_segment_changes('segment1', segment1, '1470947453877')

synchronizer.fetch_splits
synchronizer.fetch_splits(0)
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
end

it 'fetch_splits - ' do
sync = subject.new(repositories, api_key, config, sdk_blocker, parameters)
stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')
.to_return(status: 200, body:
'{
"splits": [],
"since": -1,
"till": 1506703262918
}')

stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')
.to_return(status: 200, body:
'{
"splits": [],
"since": 1506703262918,
"till": 1506703262918
}')

stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918&till=1506703262920')
.to_return(status: 200, body:
'{
"splits": [],
"since": 1506703262918,
"till": 1506703262921
}')

sync.fetch_splits(1_506_703_262_920)

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(9)
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918&till=1506703262920')).to have_been_made.once
end

it 'fetch_segment' do
mock_segment_changes('segment3', segment3, '-1')

Expand Down
2 changes: 1 addition & 1 deletion spec/splitclient/split_config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
expect(configs.ip_addresses_enabled).to eq default_ip
expect(configs.machine_name).to eq SplitIoClient::SplitConfig.machine_hostname(default_ip, nil, :redis)
expect(configs.machine_ip).to eq SplitIoClient::SplitConfig.machine_ip(default_ip, nil, :redis)
expect(configs.on_demand_fetch_retry_delay_ms).to eq SplitIoClient::SplitConfig.default_on_demand_fetch_retry_delay_ms
expect(configs.on_demand_fetch_retry_delay_seconds).to eq SplitIoClient::SplitConfig.default_on_demand_fetch_retry_delay_seconds
expect(configs.on_demand_fetch_max_retries).to eq SplitIoClient::SplitConfig.default_on_demand_fetch_max_retries
end

Expand Down
34 changes: 32 additions & 2 deletions spec/sse/event_source/back_off_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
let(:log) { StringIO.new }

it 'get intervals and reset attemps' do
back_off = subject.new(1)
back_off = subject.new(1, 0, 5)

firts_interval = back_off.interval
expect(firts_interval).to eq(0)
Expand All @@ -27,7 +27,7 @@

it 'with custom config' do
streaming_reconnect_back_off_base = 5
back_off = subject.new(streaming_reconnect_back_off_base)
back_off = subject.new(streaming_reconnect_back_off_base, 0, 30)

firts_interval = back_off.interval
expect(firts_interval).to eq(0)
Expand All @@ -42,4 +42,34 @@
reset_interval = back_off.interval
expect(reset_interval).to eq(0)
end

it 'with max' do
streaming_reconnect_back_off_base = 5
back_off = subject.new(streaming_reconnect_back_off_base, 0, 30)

firts_interval = back_off.interval
expect(firts_interval).to eq(0)

second_interval = back_off.interval
expect(second_interval).to eq(10)

third_interval = back_off.interval
expect(third_interval).to eq(20)

third_interval = back_off.interval
expect(third_interval).to eq(30)

third_interval = back_off.interval
expect(third_interval).to eq(30)

third_interval = back_off.interval
expect(third_interval).to eq(30)

third_interval = back_off.interval
expect(third_interval).to eq(30)

back_off.reset
reset_interval = back_off.interval
expect(reset_interval).to eq(0)
end
end
2 changes: 1 addition & 1 deletion spec/sse/sse_handler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
mock_segment_changes('segment2', segment2, '1470947453878')
mock_segment_changes('segment3', segment3, '-1')

synchronizer.fetch_splits
synchronizer.fetch_splits(0)
end

context 'SPLIT UPDATE event' do
Expand Down
3 changes: 2 additions & 1 deletion spec/sse/workers/splits_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@

sleep(1)

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(10)
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.times(1)
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.at_least_times(2)
end

context 'add change number to queue' do
Expand Down