Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Metrics/MethodLength:
Metrics/ClassLength:
Max: 150

Metrics/CyclomaticComplexity:
Max: 8

Metrics/LineLength:
Max: 130
Exclude:
Expand Down
6 changes: 6 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
CHANGES

7.2.3 (Feb 24, 2021)
- Fixed missing segment fetch after an SPLIT_UPDATE.
- Updated streaming logic to support multiregion.
- Updated sse client connection logic to read confirmation event.
- Updated naming of retryable erros.

7.2.2 (Dec 18, 2020)
- Fixed issue: undefined local variable or method post_impressions_count

Expand Down
12 changes: 10 additions & 2 deletions lib/splitclient-rb/cache/fetchers/segment_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,22 @@ def call
end
end

def fetch_segments_if_not_exists(names)
names.each do |name|
change_number = @segments_repository.get_change_number(name)

fetch_segment(name) if change_number == -1
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def fetch_segment(name)
@semaphore.synchronize do
segments_api.fetch_segments_by_names([name])
true
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
false
end

def fetch_segments
Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb/cache/fetchers/split_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def fetch_splits
@config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled

@sdk_blocker.splits_ready!
true

data[:segment_names]
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
false
end

def stop_splits_thread
Expand Down
7 changes: 6 additions & 1 deletion lib/splitclient-rb/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@ class SplitIoClient::Constants
CONTROL_SEC = 'control_sec'
OCCUPANCY_CHANNEL_PREFIX = '[?occupancy=metrics.publishers]'
FETCH_BACK_OFF_BASE_RETRIES = 1
PUSH_CONNECTED = 'PUSH_CONNECTED'
PUSH_RETRYABLE_ERROR = 'PUSH_RETRYABLE_ERROR'
PUSH_NONRETRYABLE_ERROR = 'PUSH_NONRETRYABLE_ERROR'
PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN'
PUSH_SUBSYSTEM_READY = 'PUSH_SUBSYSTEM_READY'
PUSH_SUBSYSTEM_OFF = 'PUSH_SUBSYSTEM_OFF'
end

3 changes: 2 additions & 1 deletion lib/splitclient-rb/engine/push_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ def start_sse
if response[:push_enabled] && @sse_handler.start(response[:token], response[:channels])
schedule_next_token_refresh(response[:exp])
@back_off.reset
return
return true
end

stop_sse

schedule_next_token_refresh(@back_off.interval) if response[:retry]
false
rescue StandardError => e
@config.logger.error("start_sse: #{e.inspect}")
end
Expand Down
83 changes: 50 additions & 33 deletions lib/splitclient-rb/engine/sync_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ def initialize(
)
@synchronizer = synchronizer
notification_manager_keeper = SplitIoClient::SSE::NotificationManagerKeeper.new(config) do |manager|
manager.on_occupancy { |publisher_available| process_occupancy(publisher_available) }
manager.on_push_shutdown { process_push_shutdown }
manager.on_action { |action| process_action(action) }
end
@sse_handler = SplitIoClient::SSE::SSEHandler.new(
config,
Expand All @@ -21,8 +20,7 @@ def initialize(
repositories[:segments],
notification_manager_keeper
) do |handler|
handler.on_connected { process_connected }
handler.on_disconnect { |reconnect| process_disconnect(reconnect) }
handler.on_action { |action| process_action(action) }
end

@push_manager = PushManager.new(config, @sse_handler, api_key)
Expand All @@ -44,10 +42,10 @@ def start
# Starts tasks if stream is enabled.
def start_stream
@config.logger.debug('Starting push mode ...')
stream_start_thread
sync_all_thread
@synchronizer.start_periodic_data_recording

stream_start_sse_thread
start_sse_connection_thread
end

def start_poll
Expand All @@ -59,23 +57,24 @@ def start_poll
end

# Starts thread which fetch splits and segments once and trigger task to periodic data recording.
def stream_start_thread
def sync_all_thread
@config.threads[:sync_manager_start_stream] = Thread.new do
begin
@synchronizer.sync_all
rescue StandardError => e
@config.logger.error("stream_start_thread error : #{e.inspect}")
@config.logger.error("sync_all_thread error : #{e.inspect}")
end
end
end

# Starts thread which connect to sse and after that fetch splits and segments once.
def stream_start_sse_thread
def start_sse_connection_thread
@config.threads[:sync_manager_start_sse] = Thread.new do
begin
@push_manager.start_sse
connected = @push_manager.start_sse
@synchronizer.start_periodic_fetch unless connected
rescue StandardError => e
@config.logger.error("stream_start_sse_thread error : #{e.inspect}")
@config.logger.error("start_sse_connection_thread error : #{e.inspect}")
end
end
end
Expand All @@ -84,6 +83,46 @@ def start_stream_forked
PhusionPassenger.on_event(:starting_worker_process) { |forked| start_stream if forked }
end

def process_action(action)
case action
when Constants::PUSH_CONNECTED
process_connected
when Constants::PUSH_RETRYABLE_ERROR
process_disconnect(true)
when Constants::PUSH_NONRETRYABLE_ERROR
process_disconnect(false)
when Constants::PUSH_SUBSYSTEM_DOWN
process_subsystem_down
when Constants::PUSH_SUBSYSTEM_READY
process_subsystem_ready
when Constants::PUSH_SUBSYSTEM_OFF
process_push_shutdown
else
@config.logger.debug('Incorrect action type.')
end
rescue StandardError => e
@config.logger.error("process_action error: #{e.inspect}")
end

def process_subsystem_ready
@synchronizer.stop_periodic_fetch
@synchronizer.sync_all
@sse_handler.start_workers
end

def process_subsystem_down
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
end

def process_push_shutdown
@push_manager.stop_sse
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
rescue StandardError => e
@config.logger.error("process_push_shutdown error: #{e.inspect}")
end

def process_connected
if @sse_connected.value
@config.logger.debug('Streaming already connected.')
Expand Down Expand Up @@ -115,28 +154,6 @@ def process_disconnect(reconnect)
rescue StandardError => e
@config.logger.error("process_disconnect error: #{e.inspect}")
end

def process_occupancy(push_enable)
if push_enable
@synchronizer.stop_periodic_fetch
@synchronizer.sync_all
@sse_handler.start_workers
return
end

@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
rescue StandardError => e
@config.logger.error("process_occupancy error: #{e.inspect}")
end

def process_push_shutdown
@push_manager.stop_sse
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
rescue StandardError => e
@config.logger.error("process_push_shutdown error: #{e.inspect}")
end
end
end
end
17 changes: 4 additions & 13 deletions lib/splitclient-rb/engine/synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def initialize(
def sync_all
@config.threads[:sync_all_thread] = Thread.new do
@config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled
fetch_splits
@split_fetcher.fetch_splits
fetch_segments
end
end
Expand All @@ -53,21 +53,12 @@ def stop_periodic_fetch
end

def fetch_splits
back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1)
loop do
break if @split_fetcher.fetch_splits

sleep(back_off.interval)
end
segment_names = @split_fetcher.fetch_splits
@segment_fetcher.fetch_segments_if_not_exists(segment_names) unless segment_names.empty?
end

def fetch_segment(name)
back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1)
loop do
break if @segment_fetcher.fetch_segment(name)

sleep(back_off.interval)
end
@segment_fetcher.fetch_segment(name)
end

private
Expand Down
Loading