diff --git a/.rubocop.yml b/.rubocop.yml index 1662ec861..ef65af325 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -10,6 +10,9 @@ Metrics/MethodLength: Metrics/ClassLength: Max: 150 +Metrics/CyclomaticComplexity: + Max: 8 + Metrics/LineLength: Max: 130 Exclude: diff --git a/CHANGES.txt b/CHANGES.txt index aab4cd97c..cc86e6051 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,11 @@ CHANGES +7.2.3 (Feb 24, 2021) +- Fixed missing segment fetch after an SPLIT_UPDATE. +- Updated streaming logic to support multiregion. +- Updated sse client connection logic to read confirmation event. +- Updated naming of retryable erros. + 7.2.2 (Dec 18, 2020) - Fixed issue: undefined local variable or method post_impressions_count diff --git a/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb b/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb index 6cd7b2952..19dab74ae 100644 --- a/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb +++ b/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb @@ -27,14 +27,22 @@ def call end end + def fetch_segments_if_not_exists(names) + names.each do |name| + change_number = @segments_repository.get_change_number(name) + + fetch_segment(name) if change_number == -1 + end + rescue StandardError => error + @config.log_found_exception(__method__.to_s, error) + end + def fetch_segment(name) @semaphore.synchronize do segments_api.fetch_segments_by_names([name]) - true end rescue StandardError => error @config.log_found_exception(__method__.to_s, error) - false end def fetch_segments diff --git a/lib/splitclient-rb/cache/fetchers/split_fetcher.rb b/lib/splitclient-rb/cache/fetchers/split_fetcher.rb index 104a839c8..8510ae438 100644 --- a/lib/splitclient-rb/cache/fetchers/split_fetcher.rb +++ b/lib/splitclient-rb/cache/fetchers/split_fetcher.rb @@ -41,11 +41,11 @@ def fetch_splits @config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled @sdk_blocker.splits_ready! - true + + data[:segment_names] end rescue StandardError => error @config.log_found_exception(__method__.to_s, error) - false end def stop_splits_thread diff --git a/lib/splitclient-rb/constants.rb b/lib/splitclient-rb/constants.rb index 6c3cf64f9..c01d8fc8f 100644 --- a/lib/splitclient-rb/constants.rb +++ b/lib/splitclient-rb/constants.rb @@ -6,5 +6,10 @@ class SplitIoClient::Constants CONTROL_SEC = 'control_sec' OCCUPANCY_CHANNEL_PREFIX = '[?occupancy=metrics.publishers]' FETCH_BACK_OFF_BASE_RETRIES = 1 + PUSH_CONNECTED = 'PUSH_CONNECTED' + PUSH_RETRYABLE_ERROR = 'PUSH_RETRYABLE_ERROR' + PUSH_NONRETRYABLE_ERROR = 'PUSH_NONRETRYABLE_ERROR' + PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN' + PUSH_SUBSYSTEM_READY = 'PUSH_SUBSYSTEM_READY' + PUSH_SUBSYSTEM_OFF = 'PUSH_SUBSYSTEM_OFF' end - \ No newline at end of file diff --git a/lib/splitclient-rb/engine/push_manager.rb b/lib/splitclient-rb/engine/push_manager.rb index cc10edb80..a219f47ab 100644 --- a/lib/splitclient-rb/engine/push_manager.rb +++ b/lib/splitclient-rb/engine/push_manager.rb @@ -19,12 +19,13 @@ def start_sse if response[:push_enabled] && @sse_handler.start(response[:token], response[:channels]) schedule_next_token_refresh(response[:exp]) @back_off.reset - return + return true end stop_sse schedule_next_token_refresh(@back_off.interval) if response[:retry] + false rescue StandardError => e @config.logger.error("start_sse: #{e.inspect}") end diff --git a/lib/splitclient-rb/engine/sync_manager.rb b/lib/splitclient-rb/engine/sync_manager.rb index 4b22ee3f2..2ca3dccac 100644 --- a/lib/splitclient-rb/engine/sync_manager.rb +++ b/lib/splitclient-rb/engine/sync_manager.rb @@ -11,8 +11,7 @@ def initialize( ) @synchronizer = synchronizer notification_manager_keeper = SplitIoClient::SSE::NotificationManagerKeeper.new(config) do |manager| - manager.on_occupancy { |publisher_available| process_occupancy(publisher_available) } - manager.on_push_shutdown { process_push_shutdown } + manager.on_action { |action| process_action(action) } end @sse_handler = SplitIoClient::SSE::SSEHandler.new( config, @@ -21,8 +20,7 @@ def initialize( repositories[:segments], notification_manager_keeper ) do |handler| - handler.on_connected { process_connected } - handler.on_disconnect { |reconnect| process_disconnect(reconnect) } + handler.on_action { |action| process_action(action) } end @push_manager = PushManager.new(config, @sse_handler, api_key) @@ -44,10 +42,10 @@ def start # Starts tasks if stream is enabled. def start_stream @config.logger.debug('Starting push mode ...') - stream_start_thread + sync_all_thread @synchronizer.start_periodic_data_recording - stream_start_sse_thread + start_sse_connection_thread end def start_poll @@ -59,23 +57,24 @@ def start_poll end # Starts thread which fetch splits and segments once and trigger task to periodic data recording. - def stream_start_thread + def sync_all_thread @config.threads[:sync_manager_start_stream] = Thread.new do begin @synchronizer.sync_all rescue StandardError => e - @config.logger.error("stream_start_thread error : #{e.inspect}") + @config.logger.error("sync_all_thread error : #{e.inspect}") end end end # Starts thread which connect to sse and after that fetch splits and segments once. - def stream_start_sse_thread + def start_sse_connection_thread @config.threads[:sync_manager_start_sse] = Thread.new do begin - @push_manager.start_sse + connected = @push_manager.start_sse + @synchronizer.start_periodic_fetch unless connected rescue StandardError => e - @config.logger.error("stream_start_sse_thread error : #{e.inspect}") + @config.logger.error("start_sse_connection_thread error : #{e.inspect}") end end end @@ -84,6 +83,46 @@ def start_stream_forked PhusionPassenger.on_event(:starting_worker_process) { |forked| start_stream if forked } end + def process_action(action) + case action + when Constants::PUSH_CONNECTED + process_connected + when Constants::PUSH_RETRYABLE_ERROR + process_disconnect(true) + when Constants::PUSH_NONRETRYABLE_ERROR + process_disconnect(false) + when Constants::PUSH_SUBSYSTEM_DOWN + process_subsystem_down + when Constants::PUSH_SUBSYSTEM_READY + process_subsystem_ready + when Constants::PUSH_SUBSYSTEM_OFF + process_push_shutdown + else + @config.logger.debug('Incorrect action type.') + end + rescue StandardError => e + @config.logger.error("process_action error: #{e.inspect}") + end + + def process_subsystem_ready + @synchronizer.stop_periodic_fetch + @synchronizer.sync_all + @sse_handler.start_workers + end + + def process_subsystem_down + @sse_handler.stop_workers + @synchronizer.start_periodic_fetch + end + + def process_push_shutdown + @push_manager.stop_sse + @sse_handler.stop_workers + @synchronizer.start_periodic_fetch + rescue StandardError => e + @config.logger.error("process_push_shutdown error: #{e.inspect}") + end + def process_connected if @sse_connected.value @config.logger.debug('Streaming already connected.') @@ -115,28 +154,6 @@ def process_disconnect(reconnect) rescue StandardError => e @config.logger.error("process_disconnect error: #{e.inspect}") end - - def process_occupancy(push_enable) - if push_enable - @synchronizer.stop_periodic_fetch - @synchronizer.sync_all - @sse_handler.start_workers - return - end - - @sse_handler.stop_workers - @synchronizer.start_periodic_fetch - rescue StandardError => e - @config.logger.error("process_occupancy error: #{e.inspect}") - end - - def process_push_shutdown - @push_manager.stop_sse - @sse_handler.stop_workers - @synchronizer.start_periodic_fetch - rescue StandardError => e - @config.logger.error("process_push_shutdown error: #{e.inspect}") - end end end end diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index baf4f9b76..b9ea30e6e 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -30,7 +30,7 @@ def initialize( def sync_all @config.threads[:sync_all_thread] = Thread.new do @config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled - fetch_splits + @split_fetcher.fetch_splits fetch_segments end end @@ -53,21 +53,12 @@ def stop_periodic_fetch end def fetch_splits - back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1) - loop do - break if @split_fetcher.fetch_splits - - sleep(back_off.interval) - end + segment_names = @split_fetcher.fetch_splits + @segment_fetcher.fetch_segments_if_not_exists(segment_names) unless segment_names.empty? end def fetch_segment(name) - back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1) - loop do - break if @segment_fetcher.fetch_segment(name) - - sleep(back_off.interval) - end + @segment_fetcher.fetch_segment(name) end private diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 920aef4a3..abdf484b2 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -9,6 +9,7 @@ module EventSource class Client DEFAULT_READ_TIMEOUT = 70 CONNECT_TIMEOUT = 30_000 + OK_CODE = 200 KEEP_ALIVE_RESPONSE = "c\r\n:keepalive\n\n\r\n".freeze ERROR_EVENT_TYPE = 'error'.freeze @@ -16,9 +17,10 @@ def initialize(config, read_timeout: DEFAULT_READ_TIMEOUT) @config = config @read_timeout = read_timeout @connected = Concurrent::AtomicBoolean.new(false) + @first_event = Concurrent::AtomicBoolean.new(true) @socket = nil @event_parser = SSE::EventSource::EventParser.new(config) - @on = { event: ->(_) {}, connected: ->(_) {}, disconnect: ->(_) {} } + @on = { event: ->(_) {}, action: ->(_) {} } yield self if block_given? end @@ -27,16 +29,12 @@ def on_event(&action) @on[:event] = action end - def on_connected(&action) - @on[:connected] = action + def on_action(&action) + @on[:action] = action end - def on_disconnect(&action) - @on[:disconnect] = action - end - - def close(reconnect = false) - dispatch_disconnect(reconnect) + def close(action = Constants::PUSH_NONRETRYABLE_ERROR) + dispatch_action(action) @connected.make_false SplitIoClient::Helpers::ThreadHelper.stop(:connect_stream, @config) @socket&.close @@ -45,6 +43,11 @@ def close(reconnect = false) end def start(url) + if connected? + @config.logger.debug('SSEClient already running.') + return true + end + @uri = URI(url) latch = Concurrent::CountDownLatch.new(1) @@ -72,16 +75,18 @@ def connect_thread(latch) end def connect_stream(latch) - socket_write(latch) + socket_write - while @connected.value + while connected? || @first_event.value begin partial_data = @socket.readpartial(10_000, timeout: @read_timeout) + read_first_event(partial_data, latch) + raise 'eof exception' if partial_data == :eof rescue StandardError => e @config.logger.error('Error reading partial data: ' + e.inspect) if @config.debug_enabled - close(true) # close conexion & reconnect + close(Constants::PUSH_RETRYABLE_ERROR) return end @@ -89,14 +94,31 @@ def connect_stream(latch) end end - def socket_write(latch) + def socket_write + @first_event.make_true @socket = socket_connect @socket.write(build_request(@uri)) - dispatch_connected rescue StandardError => e @config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}") - close - ensure + close(Constants::PUSH_NONRETRYABLE_ERROR) + end + + def read_first_event(data, latch) + return unless @first_event.value + + response_code = @event_parser.first_event(data) + @config.logger.debug("SSE client first event code: #{response_code}") + + error_event = false + events = @event_parser.parse(data) + events.each { |e| error_event = true if e.event_type == ERROR_EVENT_TYPE } + @first_event.make_false + + if response_code == OK_CODE && !error_event + @connected.make_true + dispatch_action(Constants::PUSH_CONNECTED) + end + latch.count_down end @@ -137,9 +159,9 @@ def process_event(event) def dispatch_error(event) @config.logger.error("Event error: #{event.event_type}, #{event.data}") if event.data['code'] >= 40_140 && event.data['code'] <= 40_149 - close(true) # close conexion & reconnect + close(Constants::PUSH_RETRYABLE_ERROR) elsif event.data['code'] >= 40_000 && event.data['code'] <= 49_999 - close # close conexion + close(Constants::PUSH_NONRETRYABLE_ERROR) end end @@ -148,15 +170,9 @@ def dispatch_event(event) @on[:event].call(event) end - def dispatch_connected - @connected.make_true - @config.logger.debug('Dispatching connected') if @config.debug_enabled - @on[:connected].call - end - - def dispatch_disconnect(reconnect) - @config.logger.debug('Dispatching disconnect') if @config.debug_enabled - @on[:disconnect].call(reconnect) + def dispatch_action(action) + @config.logger.debug("Dispatching action: #{action}") if @config.debug_enabled + @on[:action].call(action) end end end diff --git a/lib/splitclient-rb/sse/event_source/event_parser.rb b/lib/splitclient-rb/sse/event_source/event_parser.rb index 961d89bcc..2005b5226 100644 --- a/lib/splitclient-rb/sse/event_source/event_parser.rb +++ b/lib/splitclient-rb/sse/event_source/event_parser.rb @@ -4,6 +4,8 @@ module SplitIoClient module SSE module EventSource class EventParser + BAD_REQUEST_CODE = 400 + def initialize(config) @config = config end @@ -27,10 +29,17 @@ def parse(raw_event) events rescue StandardError => e - @config.logger.error("Error during parsing a event: #{e.inspect}") + @config.logger.debug("Error during parsing a event: #{e.inspect}") [] end + def first_event(raw_data) + raw_data.split("\n")[0].split(' ')[1].to_i + rescue StandardError => e + @config.logger.debug("Error parsing first event: #{e.inspect}") + BAD_REQUEST_CODE + end + private def parse_event_data(data, type) diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index c014c603c..8a86bf8eb 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'concurrent/atomics' +require 'concurrent' module SplitIoClient module SSE @@ -8,7 +8,9 @@ class NotificationManagerKeeper def initialize(config) @config = config @publisher_available = Concurrent::AtomicBoolean.new(true) - @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } + @publishers_pri = Concurrent::AtomicFixnum.new + @publishers_sec = Concurrent::AtomicFixnum.new + @on = { action: ->(_) {} } yield self if block_given? end @@ -16,19 +18,15 @@ def initialize(config) def handle_incoming_occupancy_event(event) if event.data['type'] == 'CONTROL' process_event_control(event.data['controlType']) - elsif event.channel == SplitIoClient::Constants::CONTROL_PRI - process_event_occupancy(event.data['metrics']['publishers']) + else + process_event_occupancy(event.channel, event.data['metrics']['publishers']) end rescue StandardError => e @config.logger.error(e) end - def on_occupancy(&action) - @on[:occupancy] = action - end - - def on_push_shutdown(&action) - @on[:push_shutdown] = action + def on_action(&action) + @on[:action] = action end private @@ -36,35 +34,42 @@ def on_push_shutdown(&action) def process_event_control(type) case type when 'STREAMING_PAUSED' - dispatch_occupancy_event(false) + dispatch_action(Constants::PUSH_SUBSYSTEM_DOWN) when 'STREAMING_RESUMED' - dispatch_occupancy_event(true) if @publisher_available.value + dispatch_action(Constants::PUSH_SUBSYSTEM_READY) if @publisher_available.value when 'STREAMING_DISABLED' - dispatch_push_shutdown + dispatch_action(Constants::PUSH_SUBSYSTEM_OFF) else @config.logger.error("Incorrect event type: #{incoming_notification}") end end - def process_event_occupancy(publishers) - @config.logger.debug("Occupancy process event with #{publishers} publishers") if @config.debug_enabled - if publishers <= 0 && @publisher_available.value + def process_event_occupancy(channel, publishers) + @config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") + + update_publishers(channel, publishers) + + if !are_publishers_available? && @publisher_available.value @publisher_available.make_false - dispatch_occupancy_event(false) - elsif publishers >= 1 && !@publisher_available.value + dispatch_action(Constants::PUSH_SUBSYSTEM_DOWN) + elsif are_publishers_available? && !@publisher_available.value @publisher_available.make_true - dispatch_occupancy_event(true) + dispatch_action(Constants::PUSH_SUBSYSTEM_READY) end end - def dispatch_occupancy_event(push_enable) - @config.logger.debug("Dispatching occupancy event with publisher avaliable: #{push_enable}") - @on[:occupancy].call(push_enable) + def update_publishers(channel, publishers) + @publishers_pri.value = publishers if channel == Constants::CONTROL_PRI + @publishers_sec.value = publishers if channel == Constants::CONTROL_SEC + end + + def are_publishers_available? + @publishers_pri.value.positive? || @publishers_sec.value.positive? end - def dispatch_push_shutdown - @config.logger.debug('Dispatching push shutdown') - @on[:push_shutdown].call + def dispatch_action(action) + @config.logger.debug("Dispatching action: #{action}") + @on[:action].call(action) end end end diff --git a/lib/splitclient-rb/sse/sse_handler.rb b/lib/splitclient-rb/sse/sse_handler.rb index 6da566c7a..cd0a2953e 100644 --- a/lib/splitclient-rb/sse/sse_handler.rb +++ b/lib/splitclient-rb/sse/sse_handler.rb @@ -13,11 +13,10 @@ def initialize(config, synchronizer, splits_repository, segments_repository, not @notification_processor = SplitIoClient::SSE::NotificationProcessor.new(config, @splits_worker, @segments_worker) @sse_client = SSE::EventSource::Client.new(@config) do |client| client.on_event { |event| handle_incoming_message(event) } - client.on_connected { process_connected } - client.on_disconnect { |reconnect| process_disconnect(reconnect) } + client.on_action { |action| process_action(action) } end - @on = { connected: ->(_) {}, disconnect: ->(_) {} } + @on = { action: ->(_) {} } yield self if block_given? end @@ -48,22 +47,14 @@ def stop_workers @segments_worker.stop end - def on_connected(&action) - @on[:connected] = action - end - - def on_disconnect(&action) - @on[:disconnect] = action - end - - def process_disconnect(reconnect) - @on[:disconnect].call(reconnect) + def on_action(&action) + @on[:action] = action end private - def process_connected - @on[:connected].call + def process_action(action) + @on[:action].call(action) end def handle_incoming_message(notification) diff --git a/lib/splitclient-rb/sse/workers/segments_worker.rb b/lib/splitclient-rb/sse/workers/segments_worker.rb index 3b364c7b0..6d5768e85 100644 --- a/lib/splitclient-rb/sse/workers/segments_worker.rb +++ b/lib/splitclient-rb/sse/workers/segments_worker.rb @@ -48,12 +48,13 @@ def stop def perform while (item = @queue.pop) segment_name = item[:segment_name] - change_number = item[:change_number] - since = @segments_repository.get_change_number(segment_name) + cn = item[:change_number] + @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") - unless since >= change_number - @config.logger.debug("SegmentsWorker fetch_segment with #{since}") + attempt = 0 + while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_segment(segment_name) + attempt += 1 end end end diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index a347c8ff7..ba4fd5273 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -3,6 +3,8 @@ module SplitIoClient module SSE module Workers + MAX_RETRIES_ALLOWED = 10 + class SplitsWorker def initialize(synchronizer, config, splits_repository) @synchronizer = synchronizer @@ -59,11 +61,12 @@ def kill_split(change_number, split_name, default_treatment) def perform while (change_number = @queue.pop) - since = @splits_repository.get_change_number + @config.logger.debug("SplitsWorker change_number dequeue #{change_number}") - unless since.to_i >= change_number - @config.logger.debug("SplitsWorker fetch_splits with #{since}") + attempt = 0 + while change_number > @splits_repository.get_change_number.to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_splits + attempt += 1 end end end diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index a4b550053..e446988e9 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '7.2.2' + VERSION = '7.2.3' end diff --git a/spec/engine/push_manager_spec.rb b/spec/engine/push_manager_spec.rb index 2f2d607d0..4231353ca 100644 --- a/spec/engine/push_manager_spec.rb +++ b/spec/engine/push_manager_spec.rb @@ -42,8 +42,7 @@ stub_request(:get, config.auth_service_url).to_return(status: 200, body: body_response) config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -51,28 +50,25 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) - + expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) end end it 'must not connect to server. Auth server return 500' do stub_request(:get, config.auth_service_url).to_return(status: 500) - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -80,27 +76,25 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) + expect(connected).to eq(false) expect(sse_handler.connected?).to eq(false) - expect(connected_event).to eq(false) - expect(disconnect_event).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_NONRETRYABLE_ERROR) end it 'must not connect to server. Auth server return 401' do stub_request(:get, config.auth_service_url).to_return(status: 401) - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -108,20 +102,19 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) + expect(connected).to eq(false) expect(sse_handler.connected?).to eq(false) - expect(connected_event).to eq(false) - expect(disconnect_event).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_NONRETRYABLE_ERROR) end end @@ -135,8 +128,7 @@ stub_request(:get, config.auth_service_url).to_return(status: 200, body: body_response) config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -144,25 +136,23 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) + expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) push_manager.stop_sse expect(sse_handler.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end diff --git a/spec/engine/sync_manager_spec.rb b/spec/engine/sync_manager_spec.rb index 5a0df6c97..4c9a3e028 100644 --- a/spec/engine/sync_manager_spec.rb +++ b/spec/engine/sync_manager_spec.rb @@ -93,7 +93,7 @@ end end - it 'start sync manager receiving control message, must switch to pollingbundl' do + it 'start sync manager receiving control message, must switch to polling' do mock_server do |server| server.setup_response('/') do |_, res| send_content(res, event_control) diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index 0958d77bb..c3354e308 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -87,34 +87,22 @@ it 'fetch_splits' do mock_split_changes(splits) + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') synchronizer.fetch_splits expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once end - it 'fetch_splits with retries' do - stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') - .to_return({ status: 500 }, { status: 200, body: splits }) - - synchronizer.fetch_splits - - expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.times(2) - end - it 'fetch_segment' do mock_segment_changes('segment3', segment3, '-1') synchronizer.fetch_segment('segment3') expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.once end - - it 'fetch_segment with retries' do - stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1') - .to_return({ status: 500 }, { status: 200, body: segment3 }) - - synchronizer.fetch_segment('segment3') - expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.times(2) - end end private diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 4ed19b250..5afa193ea 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -22,16 +22,15 @@ server.setup_response('/') do |_, res| send_stream_content(res, event_split_update) end - event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::SPLIT_UPDATE) @@ -40,13 +39,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -57,14 +54,14 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::SPLIT_KILL) @@ -75,13 +72,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -92,14 +87,14 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::SEGMENT_UPDATE) @@ -109,13 +104,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -126,14 +119,14 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::CONTROL) @@ -142,13 +135,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -159,25 +150,23 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) sleep 0.5 expect(event_queue.empty?).to be_truthy expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -188,14 +177,14 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['metrics']['publishers']).to eq(2) @@ -203,46 +192,59 @@ expect(event_result.client_id).to eq(nil) expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end it 'receive error event' do mock_server do |server| server.setup_response('/') do |_, res| - send_stream_content(res, event_error) + send_stream_content(res, event_error, 400) end event_queue = Queue.new - connected_event = false - disconnect_queue = Queue.new + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_queue << true } + client.on_action { |action| action_event = action } end - sse_client.start(server.base_uri) - result = disconnect_queue.pop - expect(result).to eq(true) + connected = sse_client.start(server.base_uri) + + expect(connected).to eq(false) expect(sse_client.connected?).to eq(false) - expect(connected_event).to eq(true) expect(event_queue.empty?).to eq(true) end end + + it 'first event - when server return 400' do + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_error, 400) + end + + event_queue = Queue.new + action_event = '' + sse_client = subject.new(config) do |client| + client.on_event { |event| event_queue << event } + client.on_action { |action| action_event = action } + end + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(false) + end + end end private -def send_stream_content(res, content) +def send_stream_content(res, content, status = 200) res.content_type = 'text/event-stream' - res.status = 200 + res.status = status res.chunked = true rd, wr = IO.pipe wr.write(content) diff --git a/spec/sse/notification_manager_keeper_spec.rb b/spec/sse/notification_manager_keeper_spec.rb index abc4da910..fa86e7076 100644 --- a/spec/sse/notification_manager_keeper_spec.rb +++ b/spec/sse/notification_manager_keeper_spec.rb @@ -10,136 +10,174 @@ context 'CONTROL EVENT' do it 'STREAMING_PAUSED' do - result = nil - shutdown = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end + data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_PAUSED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) - expect(shutdown).to eq(nil) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) end it 'STREAMING_RESUMED with publishers enabled' do - result = nil - shutdown = nil + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end + data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_RESUMED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) - expect(shutdown).to eq(nil) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) end it 'STREAMING_RESUMED without publishers enabled' do - result = nil - shutdown = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) + + action_event = nil data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_RESUMED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') - result = nil noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) - expect(shutdown).to eq(nil) + expect(action_event).to eq(nil) end it 'STREAMING_DISABLED' do - result = nil - shutdown = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end + data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_DISABLED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) - expect(shutdown).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_OFF) end end context 'OCCUPANCY EVENT' do it 'first time without publishers available' do - result = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } + manager.on_action { |action| action_event = action } end + data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) end it 'first time with publishers available' do - result = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } + manager.on_action { |action| action_event = action } end + data = { 'metrics' => { 'publishers' => 2 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) end it 'handle many events' do - result = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } + manager.on_action { |action| action_event = action } end + data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 1 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-clienrubot-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 2 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 5 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) + + action_event = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) + + action_event = nil + data = { 'metrics' => { 'publishers' => 1 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) + + action_event = nil + data = { 'metrics' => { 'publishers' => 2 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(nil) + + action_event = nil + data = { 'metrics' => { 'publishers' => 3 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(nil) + + action_event = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(nil) + + action_event = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) + + action_event = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(nil) + + action_event = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(nil) end end end diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 6bf8bc2e3..75ab80f70 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -64,27 +64,24 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -97,27 +94,24 @@ splits_repository.set_change_number(1_506_703_262_916) config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end @@ -130,31 +124,28 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) split = splits_repository.get_split('FACUNDO_TEST') expect(split[:killed]).to be_truthy expect(split[:defaultTreatment]).to eq('on') expect(split[:changeNumber]).to eq(1_506_703_262_918) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -165,64 +156,58 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) split = splits_repository.get_split('FACUNDO_TEST') expect(split[:killed]).to be_truthy expect(split[:defaultTreatment]).to eq('on') expect(split[:changeNumber]).to eq(1_506_703_262_916) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end context 'SEGMENT UPDATE event' do - it 'must trigger fetch' do + it 'must trigger fetch - with retries' do mock_server do |server| server.setup_response('/') do |_, res| send_content(res, event_segment_update_must_fetch) end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) - expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(1) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -233,26 +218,24 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') + sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.once - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end @@ -265,26 +248,23 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end - sse_handler.start('token-test', 'channel-test') sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end diff --git a/spec/sse/workers/segments_worker_spec.rb b/spec/sse/workers/segments_worker_spec.rb index 85aaecd24..131d731df 100644 --- a/spec/sse/workers/segments_worker_spec.rb +++ b/spec/sse/workers/segments_worker_spec.rb @@ -41,6 +41,16 @@ context 'add segment name to queue' do it 'must trigger fetch' do + stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877') + .to_return(status: 200, body: + '{ + "name": "segment1", + "added": [], + "removed": [], + "since": 1470947453878, + "till": 1470947453878 + }') + worker = subject.new(synchronizer, config, segments_repository) worker.start @@ -51,6 +61,17 @@ expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(2) end + it 'must trigger fetch - with retries' do + worker = subject.new(synchronizer, config, segments_repository) + + worker.start + worker.add_to_queue(1_506_703_262_918, 'segment1') + + sleep(1) + + expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12) + end + it 'must not trigger fetch' do worker = subject.new(synchronizer, config, segments_repository) diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 2ecfddeaf..2bc4fb397 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -27,18 +27,44 @@ let(:params) { { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: impression_counter } } let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, sdk_blocker, params) } - before do - mock_split_changes(splits) - mock_segment_changes('segment1', segment1, '-1') - mock_segment_changes('segment1', segment1, '1470947453877') - mock_segment_changes('segment2', segment2, '-1') - mock_segment_changes('segment2', segment2, '1470947453878') - mock_segment_changes('segment3', segment3, '-1') - - split_fetcher.fetch_splits + it 'add change number - must tigger fetcch - with retries' do + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') + .to_return(status: 200, body: + '{ + "splits": [], + "since": -1, + "till": 1506703262918 + }') + + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918') + .to_return(status: 200, body: + '{ + "splits": [], + "since": 1506703262918, + "till": 1506703262918 + }') + + worker = subject.new(synchronizer, config, splits_repository) + worker.start + worker.add_to_queue(1_506_703_262_919) + + sleep(1) + + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(10) end context 'add change number to queue' do + before do + mock_split_changes(splits) + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + + split_fetcher.fetch_splits + end + it 'must trigger fetch' do worker = subject.new(synchronizer, config, splits_repository) worker.start @@ -71,6 +97,17 @@ end context 'kill split notification' do + before do + mock_split_changes(splits) + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + + split_fetcher.fetch_splits + end + it 'must kill split and trigger fetch' do worker = subject.new(synchronizer, config, splits_repository)