From 63cd8896fe3f9923461f6a7e1cef508dc62e30f5 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Fri, 15 Jan 2021 10:47:37 -0300 Subject: [PATCH 01/10] sec region support --- lib/splitclient-rb/sse/event_source/client.rb | 2 +- .../sse/notification_manager_keeper.rb | 36 +++++++++----- spec/sse/notification_manager_keeper_spec.rb | 48 +++++++++++++++++++ 3 files changed, 74 insertions(+), 12 deletions(-) diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 920aef4a3..3478c6ee9 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -74,7 +74,7 @@ def connect_thread(latch) def connect_stream(latch) socket_write(latch) - while @connected.value + while connected? begin partial_data = @socket.readpartial(10_000, timeout: @read_timeout) diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index c014c603c..449de54af 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -1,13 +1,15 @@ # frozen_string_literal: true -require 'concurrent/atomics' +require 'concurrent' module SplitIoClient module SSE class NotificationManagerKeeper def initialize(config) @config = config - @publisher_available = Concurrent::AtomicBoolean.new(true) + @streaming_available = Concurrent::AtomicBoolean.new(true) + @publishers_pri = Concurrent::AtomicFixnum.new + @publishers_sec = Concurrent::AtomicFixnum.new @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } yield self if block_given? @@ -16,8 +18,8 @@ def initialize(config) def handle_incoming_occupancy_event(event) if event.data['type'] == 'CONTROL' process_event_control(event.data['controlType']) - elsif event.channel == SplitIoClient::Constants::CONTROL_PRI - process_event_occupancy(event.data['metrics']['publishers']) + else + process_event_occupancy(event.channel, event.data['metrics']['publishers']) end rescue StandardError => e @config.logger.error(e) @@ -38,7 +40,7 @@ def process_event_control(type) when 'STREAMING_PAUSED' dispatch_occupancy_event(false) when 'STREAMING_RESUMED' - dispatch_occupancy_event(true) if @publisher_available.value + dispatch_occupancy_event(true) if @streaming_available.value when 'STREAMING_DISABLED' dispatch_push_shutdown else @@ -46,17 +48,29 @@ def process_event_control(type) end end - def process_event_occupancy(publishers) - @config.logger.debug("Occupancy process event with #{publishers} publishers") if @config.debug_enabled - if publishers <= 0 && @publisher_available.value - @publisher_available.make_false + def process_event_occupancy(channel, publishers) + @config.logger.debug("Occupancy process event with #{publishers} publishers. Channel: #{channel}") + + update_publishers(channel, publishers) + + if !are_publishers_avaliable? && @streaming_available.value + @streaming_available.make_false dispatch_occupancy_event(false) - elsif publishers >= 1 && !@publisher_available.value - @publisher_available.make_true + elsif are_publishers_avaliable? && !@streaming_available.value + @streaming_available.make_true dispatch_occupancy_event(true) end end + def update_publishers(channel, publishers) + @publishers_pri.compare_and_set(@publishers_pri.value, publishers) if channel == SplitIoClient::Constants::CONTROL_PRI + @publishers_sec.compare_and_set(@publishers_sec.value, publishers) if channel == SplitIoClient::Constants::CONTROL_SEC + end + + def are_publishers_avaliable? + @publishers_pri.value.positive? || @publishers_sec.value.positive? + end + def dispatch_occupancy_event(push_enable) @config.logger.debug("Dispatching occupancy event with publisher avaliable: #{push_enable}") @on[:occupancy].call(push_enable) diff --git a/spec/sse/notification_manager_keeper_spec.rb b/spec/sse/notification_manager_keeper_spec.rb index abc4da910..8c95b4924 100644 --- a/spec/sse/notification_manager_keeper_spec.rb +++ b/spec/sse/notification_manager_keeper_spec.rb @@ -140,6 +140,54 @@ event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) expect(result).to eq(true) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(false) + + result = nil + data = { 'metrics' => { 'publishers' => 1 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(true) + + result = nil + data = { 'metrics' => { 'publishers' => 2 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 3 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(false) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) end end end From e1c82653482a15e371503f86e8e43b5fb07e0882 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Fri, 15 Jan 2021 20:09:34 -0300 Subject: [PATCH 02/10] pr feedback --- .../sse/notification_manager_keeper.rb | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index 449de54af..b90a4dc7a 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -7,7 +7,7 @@ module SSE class NotificationManagerKeeper def initialize(config) @config = config - @streaming_available = Concurrent::AtomicBoolean.new(true) + @publisher_available = Concurrent::AtomicBoolean.new(true) @publishers_pri = Concurrent::AtomicFixnum.new @publishers_sec = Concurrent::AtomicFixnum.new @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } @@ -40,7 +40,7 @@ def process_event_control(type) when 'STREAMING_PAUSED' dispatch_occupancy_event(false) when 'STREAMING_RESUMED' - dispatch_occupancy_event(true) if @streaming_available.value + dispatch_occupancy_event(true) if @publisher_available.value when 'STREAMING_DISABLED' dispatch_push_shutdown else @@ -49,25 +49,25 @@ def process_event_control(type) end def process_event_occupancy(channel, publishers) - @config.logger.debug("Occupancy process event with #{publishers} publishers. Channel: #{channel}") + @config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") update_publishers(channel, publishers) - if !are_publishers_avaliable? && @streaming_available.value - @streaming_available.make_false + if !are_publishers_available? && @publisher_available.value + @publisher_available.make_false dispatch_occupancy_event(false) - elsif are_publishers_avaliable? && !@streaming_available.value - @streaming_available.make_true + elsif are_publishers_available? && !@publisher_available.value + @publisher_available.make_true dispatch_occupancy_event(true) end end def update_publishers(channel, publishers) - @publishers_pri.compare_and_set(@publishers_pri.value, publishers) if channel == SplitIoClient::Constants::CONTROL_PRI - @publishers_sec.compare_and_set(@publishers_sec.value, publishers) if channel == SplitIoClient::Constants::CONTROL_SEC + @publishers_pri.value = publishers if channel == SplitIoClient::Constants::CONTROL_PRI + @publishers_sec.value = publishers if channel == SplitIoClient::Constants::CONTROL_SEC end - def are_publishers_avaliable? + def are_publishers_available? @publishers_pri.value.positive? || @publishers_sec.value.positive? end From 9050d89a24e49de1808802db615ca1aab267d584 Mon Sep 17 00:00:00 2001 From: Mauro Sanz <51236193+sanzmauro@users.noreply.github.com> Date: Fri, 22 Jan 2021 13:20:14 -0300 Subject: [PATCH 03/10] Connection confirmation - first event (#350) * Connection confirmation - first event --- lib/splitclient-rb/engine/push_manager.rb | 3 +- lib/splitclient-rb/engine/sync_manager.rb | 15 +++-- lib/splitclient-rb/sse/event_source/client.rb | 27 ++++++-- .../sse/event_source/event_parser.rb | 11 +++- spec/engine/push_manager_spec.rb | 13 ++-- spec/sse/event_source/client_spec.rb | 63 +++++++++++++----- spec/sse/sse_handler_spec.rb | 64 +++++++++++++------ 7 files changed, 140 insertions(+), 56 deletions(-) diff --git a/lib/splitclient-rb/engine/push_manager.rb b/lib/splitclient-rb/engine/push_manager.rb index cc10edb80..a219f47ab 100644 --- a/lib/splitclient-rb/engine/push_manager.rb +++ b/lib/splitclient-rb/engine/push_manager.rb @@ -19,12 +19,13 @@ def start_sse if response[:push_enabled] && @sse_handler.start(response[:token], response[:channels]) schedule_next_token_refresh(response[:exp]) @back_off.reset - return + return true end stop_sse schedule_next_token_refresh(@back_off.interval) if response[:retry] + false rescue StandardError => e @config.logger.error("start_sse: #{e.inspect}") end diff --git a/lib/splitclient-rb/engine/sync_manager.rb b/lib/splitclient-rb/engine/sync_manager.rb index 4b22ee3f2..6f733d67b 100644 --- a/lib/splitclient-rb/engine/sync_manager.rb +++ b/lib/splitclient-rb/engine/sync_manager.rb @@ -44,10 +44,10 @@ def start # Starts tasks if stream is enabled. def start_stream @config.logger.debug('Starting push mode ...') - stream_start_thread + sync_all_thread @synchronizer.start_periodic_data_recording - stream_start_sse_thread + start_sse_connection_thread end def start_poll @@ -59,23 +59,24 @@ def start_poll end # Starts thread which fetch splits and segments once and trigger task to periodic data recording. - def stream_start_thread + def sync_all_thread @config.threads[:sync_manager_start_stream] = Thread.new do begin @synchronizer.sync_all rescue StandardError => e - @config.logger.error("stream_start_thread error : #{e.inspect}") + @config.logger.error("sync_all_thread error : #{e.inspect}") end end end # Starts thread which connect to sse and after that fetch splits and segments once. - def stream_start_sse_thread + def start_sse_connection_thread @config.threads[:sync_manager_start_sse] = Thread.new do begin - @push_manager.start_sse + connected = @push_manager.start_sse + @synchronizer.start_periodic_fetch unless connected rescue StandardError => e - @config.logger.error("stream_start_sse_thread error : #{e.inspect}") + @config.logger.error("start_sse_connection_thread error : #{e.inspect}") end end end diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 3478c6ee9..ef3e22c59 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -9,6 +9,7 @@ module EventSource class Client DEFAULT_READ_TIMEOUT = 70 CONNECT_TIMEOUT = 30_000 + OK_CODE = 200 KEEP_ALIVE_RESPONSE = "c\r\n:keepalive\n\n\r\n".freeze ERROR_EVENT_TYPE = 'error'.freeze @@ -16,6 +17,7 @@ def initialize(config, read_timeout: DEFAULT_READ_TIMEOUT) @config = config @read_timeout = read_timeout @connected = Concurrent::AtomicBoolean.new(false) + @first_event = Concurrent::AtomicBoolean.new(true) @socket = nil @event_parser = SSE::EventSource::EventParser.new(config) @on = { event: ->(_) {}, connected: ->(_) {}, disconnect: ->(_) {} } @@ -72,12 +74,14 @@ def connect_thread(latch) end def connect_stream(latch) - socket_write(latch) + socket_write - while connected? + while connected? || @first_event.value begin partial_data = @socket.readpartial(10_000, timeout: @read_timeout) + read_first_event(partial_data, latch) + raise 'eof exception' if partial_data == :eof rescue StandardError => e @config.logger.error('Error reading partial data: ' + e.inspect) if @config.debug_enabled @@ -89,14 +93,27 @@ def connect_stream(latch) end end - def socket_write(latch) + def socket_write + @first_event.make_true @socket = socket_connect @socket.write(build_request(@uri)) - dispatch_connected rescue StandardError => e @config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}") close - ensure + end + + def read_first_event(data, latch) + return unless @first_event.value + + response_code = @event_parser.first_event(data) + @config.logger.debug("SSE client first event code: #{response_code}") + + error_event = false + events = @event_parser.parse(data) + events.each { |e| error_event = true if e.event_type == ERROR_EVENT_TYPE } + @first_event.make_false + + dispatch_connected if response_code == OK_CODE && !error_event latch.count_down end diff --git a/lib/splitclient-rb/sse/event_source/event_parser.rb b/lib/splitclient-rb/sse/event_source/event_parser.rb index 961d89bcc..2005b5226 100644 --- a/lib/splitclient-rb/sse/event_source/event_parser.rb +++ b/lib/splitclient-rb/sse/event_source/event_parser.rb @@ -4,6 +4,8 @@ module SplitIoClient module SSE module EventSource class EventParser + BAD_REQUEST_CODE = 400 + def initialize(config) @config = config end @@ -27,10 +29,17 @@ def parse(raw_event) events rescue StandardError => e - @config.logger.error("Error during parsing a event: #{e.inspect}") + @config.logger.debug("Error during parsing a event: #{e.inspect}") [] end + def first_event(raw_data) + raw_data.split("\n")[0].split(' ')[1].to_i + rescue StandardError => e + @config.logger.debug("Error parsing first event: #{e.inspect}") + BAD_REQUEST_CODE + end + private def parse_event_data(data, type) diff --git a/spec/engine/push_manager_spec.rb b/spec/engine/push_manager_spec.rb index 2f2d607d0..a5ad835c4 100644 --- a/spec/engine/push_manager_spec.rb +++ b/spec/engine/push_manager_spec.rb @@ -56,12 +56,12 @@ end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) - + expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) expect(connected_event).to eq(true) expect(disconnect_event).to eq(false) @@ -85,12 +85,13 @@ end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) + expect(connected).to eq(false) expect(sse_handler.connected?).to eq(false) expect(connected_event).to eq(false) expect(disconnect_event).to eq(true) @@ -113,12 +114,13 @@ end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) + expect(connected).to eq(false) expect(sse_handler.connected?).to eq(false) expect(connected_event).to eq(false) expect(disconnect_event).to eq(true) @@ -149,12 +151,13 @@ end push_manager = subject.new(config, sse_handler, api_key) - push_manager.start_sse + connected = push_manager.start_sse expect(a_request(:get, config.auth_service_url)).to have_been_made.times(1) sleep(1.5) + expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) expect(connected_event).to eq(true) expect(disconnect_event).to eq(false) diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 4ed19b250..590e758f7 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -22,7 +22,6 @@ server.setup_response('/') do |_, res| send_stream_content(res, event_split_update) end - event_queue = Queue.new connected_event = false disconnect_event = false @@ -31,7 +30,9 @@ client.on_connected { connected_event = true } client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::SPLIT_UPDATE) @@ -64,7 +65,9 @@ client.on_connected { connected_event = true } client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::SPLIT_KILL) @@ -99,7 +102,9 @@ client.on_connected { connected_event = true } client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::SEGMENT_UPDATE) @@ -133,7 +138,9 @@ client.on_connected { connected_event = true } client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['type']).to eq(SplitIoClient::SSE::EventSource::EventTypes::CONTROL) @@ -166,7 +173,9 @@ client.on_connected { connected_event = true } client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) sleep 0.5 expect(event_queue.empty?).to be_truthy @@ -195,7 +204,9 @@ client.on_connected { connected_event = true } client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(true) event_result = event_queue.pop expect(event_result.data['metrics']['publishers']).to eq(2) @@ -216,33 +227,53 @@ it 'receive error event' do mock_server do |server| server.setup_response('/') do |_, res| - send_stream_content(res, event_error) + send_stream_content(res, event_error, 400) end event_queue = Queue.new connected_event = false - disconnect_queue = Queue.new + disconnect_event = false sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } client.on_connected { connected_event = true } - client.on_disconnect { disconnect_queue << true } + client.on_disconnect { disconnect_event = true } end - sse_client.start(server.base_uri) - result = disconnect_queue.pop - expect(result).to eq(true) + connected = sse_client.start(server.base_uri) + + expect(connected).to eq(false) expect(sse_client.connected?).to eq(false) - expect(connected_event).to eq(true) + expect(connected_event).to eq(false) expect(event_queue.empty?).to eq(true) end end + + it 'first event - when server return 400' do + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_error, 400) + end + + event_queue = Queue.new + connected_event = false + disconnect_event = false + sse_client = subject.new(config) do |client| + client.on_event { |event| event_queue << event } + client.on_connected { connected_event = true } + client.on_disconnect { disconnect_event = true } + end + + connected = sse_client.start(server.base_uri) + expect(connected).to eq(false) + end + end end private -def send_stream_content(res, content) +def send_stream_content(res, content, status = 200) res.content_type = 'text/event-stream' - res.status = 200 + res.status = status res.chunked = true rd, wr = IO.pipe wr.write(content) diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 6bf8bc2e3..11c7c40b0 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -67,12 +67,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) @@ -100,12 +103,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) @@ -133,12 +139,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) @@ -168,12 +177,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) @@ -205,12 +217,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) @@ -236,11 +251,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) @@ -268,12 +287,15 @@ connected_event = false disconnect_event = false sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected { connected_event = true } + handler.on_connected do + sse_handler.start_workers + connected_event = true + end handler.on_disconnect { disconnect_event = true } end - sse_handler.start('token-test', 'channel-test') - sse_handler.start_workers + connected = sse_handler.start('token-test', 'channel-test') + expect(connected).to eq(true) sleep(2) From e6b3903ca21b9540779d691377e510e69e93ff09 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 20 Jan 2021 10:31:12 -0300 Subject: [PATCH 04/10] failires scenarios implementation --- .rubocop.yml | 3 + lib/splitclient-rb/constants.rb | 6 + lib/splitclient-rb/engine/sync_manager.rb | 68 ++++++---- lib/splitclient-rb/sse/event_source/client.rb | 40 +++--- .../sse/notification_manager_keeper.rb | 35 ++---- lib/splitclient-rb/sse/sse_handler.rb | 21 +--- spec/engine/push_manager_spec.rb | 37 ++---- spec/engine/sync_manager_spec.rb | 2 +- spec/sse/event_source/client_spec.rb | 73 ++++------- spec/sse/notification_manager_keeper_spec.rb | 118 ++++++++---------- spec/sse/sse_handler_spec.rb | 98 +++++---------- 11 files changed, 204 insertions(+), 297 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 1662ec861..ef65af325 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -10,6 +10,9 @@ Metrics/MethodLength: Metrics/ClassLength: Max: 150 +Metrics/CyclomaticComplexity: + Max: 8 + Metrics/LineLength: Max: 130 Exclude: diff --git a/lib/splitclient-rb/constants.rb b/lib/splitclient-rb/constants.rb index 6c3cf64f9..3ad047b5c 100644 --- a/lib/splitclient-rb/constants.rb +++ b/lib/splitclient-rb/constants.rb @@ -6,5 +6,11 @@ class SplitIoClient::Constants CONTROL_SEC = 'control_sec' OCCUPANCY_CHANNEL_PREFIX = '[?occupancy=metrics.publishers]' FETCH_BACK_OFF_BASE_RETRIES = 1 + PUSH_CONNECTED = 'PUSH_CONNECTED' + PUSH_RETRYABLE_ERROR = 'PUSH_RETRYABLE_ERROR' + PUSH_NONRETRYABLE_ERROR = 'PUSH_NONRETRYABLE_ERROR' + PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN' + PUSH_SUBSYSTEM_READY = 'PUSH_SUBSYSTEM_READY' + PUSH_SUBSYSTEM_OFF = 'PUSH_SUBSYSTEM_OFF' end \ No newline at end of file diff --git a/lib/splitclient-rb/engine/sync_manager.rb b/lib/splitclient-rb/engine/sync_manager.rb index 6f733d67b..2ca3dccac 100644 --- a/lib/splitclient-rb/engine/sync_manager.rb +++ b/lib/splitclient-rb/engine/sync_manager.rb @@ -11,8 +11,7 @@ def initialize( ) @synchronizer = synchronizer notification_manager_keeper = SplitIoClient::SSE::NotificationManagerKeeper.new(config) do |manager| - manager.on_occupancy { |publisher_available| process_occupancy(publisher_available) } - manager.on_push_shutdown { process_push_shutdown } + manager.on_action { |action| process_action(action) } end @sse_handler = SplitIoClient::SSE::SSEHandler.new( config, @@ -21,8 +20,7 @@ def initialize( repositories[:segments], notification_manager_keeper ) do |handler| - handler.on_connected { process_connected } - handler.on_disconnect { |reconnect| process_disconnect(reconnect) } + handler.on_action { |action| process_action(action) } end @push_manager = PushManager.new(config, @sse_handler, api_key) @@ -85,6 +83,46 @@ def start_stream_forked PhusionPassenger.on_event(:starting_worker_process) { |forked| start_stream if forked } end + def process_action(action) + case action + when Constants::PUSH_CONNECTED + process_connected + when Constants::PUSH_RETRYABLE_ERROR + process_disconnect(true) + when Constants::PUSH_NONRETRYABLE_ERROR + process_disconnect(false) + when Constants::PUSH_SUBSYSTEM_DOWN + process_subsystem_down + when Constants::PUSH_SUBSYSTEM_READY + process_subsystem_ready + when Constants::PUSH_SUBSYSTEM_OFF + process_push_shutdown + else + @config.logger.debug('Incorrect action type.') + end + rescue StandardError => e + @config.logger.error("process_action error: #{e.inspect}") + end + + def process_subsystem_ready + @synchronizer.stop_periodic_fetch + @synchronizer.sync_all + @sse_handler.start_workers + end + + def process_subsystem_down + @sse_handler.stop_workers + @synchronizer.start_periodic_fetch + end + + def process_push_shutdown + @push_manager.stop_sse + @sse_handler.stop_workers + @synchronizer.start_periodic_fetch + rescue StandardError => e + @config.logger.error("process_push_shutdown error: #{e.inspect}") + end + def process_connected if @sse_connected.value @config.logger.debug('Streaming already connected.') @@ -116,28 +154,6 @@ def process_disconnect(reconnect) rescue StandardError => e @config.logger.error("process_disconnect error: #{e.inspect}") end - - def process_occupancy(push_enable) - if push_enable - @synchronizer.stop_periodic_fetch - @synchronizer.sync_all - @sse_handler.start_workers - return - end - - @sse_handler.stop_workers - @synchronizer.start_periodic_fetch - rescue StandardError => e - @config.logger.error("process_occupancy error: #{e.inspect}") - end - - def process_push_shutdown - @push_manager.stop_sse - @sse_handler.stop_workers - @synchronizer.start_periodic_fetch - rescue StandardError => e - @config.logger.error("process_push_shutdown error: #{e.inspect}") - end end end end diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index ef3e22c59..a0bcf2732 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -20,7 +20,7 @@ def initialize(config, read_timeout: DEFAULT_READ_TIMEOUT) @first_event = Concurrent::AtomicBoolean.new(true) @socket = nil @event_parser = SSE::EventSource::EventParser.new(config) - @on = { event: ->(_) {}, connected: ->(_) {}, disconnect: ->(_) {} } + @on = { event: ->(_) {}, action: ->(_) {} } yield self if block_given? end @@ -29,16 +29,12 @@ def on_event(&action) @on[:event] = action end - def on_connected(&action) - @on[:connected] = action + def on_action(&action) + @on[:action] = action end - def on_disconnect(&action) - @on[:disconnect] = action - end - - def close(reconnect = false) - dispatch_disconnect(reconnect) + def close(action = Constants::PUSH_NONRETRYABLE_ERROR) + dispatch_action(action) @connected.make_false SplitIoClient::Helpers::ThreadHelper.stop(:connect_stream, @config) @socket&.close @@ -85,7 +81,7 @@ def connect_stream(latch) raise 'eof exception' if partial_data == :eof rescue StandardError => e @config.logger.error('Error reading partial data: ' + e.inspect) if @config.debug_enabled - close(true) # close conexion & reconnect + close(Constants::PUSH_RETRYABLE_ERROR) return end @@ -99,7 +95,7 @@ def socket_write @socket.write(build_request(@uri)) rescue StandardError => e @config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}") - close + close(Constants::PUSH_NONRETRYABLE_ERROR) end def read_first_event(data, latch) @@ -113,7 +109,11 @@ def read_first_event(data, latch) events.each { |e| error_event = true if e.event_type == ERROR_EVENT_TYPE } @first_event.make_false - dispatch_connected if response_code == OK_CODE && !error_event + if response_code == OK_CODE && !error_event + @connected.make_true + dispatch_action(Constants::PUSH_CONNECTED) + end + latch.count_down end @@ -154,9 +154,9 @@ def process_event(event) def dispatch_error(event) @config.logger.error("Event error: #{event.event_type}, #{event.data}") if event.data['code'] >= 40_140 && event.data['code'] <= 40_149 - close(true) # close conexion & reconnect + close(Constants::PUSH_RETRYABLE_ERROR) elsif event.data['code'] >= 40_000 && event.data['code'] <= 49_999 - close # close conexion + close(Constants::PUSH_NONRETRYABLE_ERROR) end end @@ -165,15 +165,9 @@ def dispatch_event(event) @on[:event].call(event) end - def dispatch_connected - @connected.make_true - @config.logger.debug('Dispatching connected') if @config.debug_enabled - @on[:connected].call - end - - def dispatch_disconnect(reconnect) - @config.logger.debug('Dispatching disconnect') if @config.debug_enabled - @on[:disconnect].call(reconnect) + def dispatch_action(action) + @config.logger.debug("Dispatching action: #{action}") if @config.debug_enabled + @on[:action].call(action) end end end diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index b90a4dc7a..8a86bf8eb 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -10,7 +10,7 @@ def initialize(config) @publisher_available = Concurrent::AtomicBoolean.new(true) @publishers_pri = Concurrent::AtomicFixnum.new @publishers_sec = Concurrent::AtomicFixnum.new - @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } + @on = { action: ->(_) {} } yield self if block_given? end @@ -25,12 +25,8 @@ def handle_incoming_occupancy_event(event) @config.logger.error(e) end - def on_occupancy(&action) - @on[:occupancy] = action - end - - def on_push_shutdown(&action) - @on[:push_shutdown] = action + def on_action(&action) + @on[:action] = action end private @@ -38,11 +34,11 @@ def on_push_shutdown(&action) def process_event_control(type) case type when 'STREAMING_PAUSED' - dispatch_occupancy_event(false) + dispatch_action(Constants::PUSH_SUBSYSTEM_DOWN) when 'STREAMING_RESUMED' - dispatch_occupancy_event(true) if @publisher_available.value + dispatch_action(Constants::PUSH_SUBSYSTEM_READY) if @publisher_available.value when 'STREAMING_DISABLED' - dispatch_push_shutdown + dispatch_action(Constants::PUSH_SUBSYSTEM_OFF) else @config.logger.error("Incorrect event type: #{incoming_notification}") end @@ -55,30 +51,25 @@ def process_event_occupancy(channel, publishers) if !are_publishers_available? && @publisher_available.value @publisher_available.make_false - dispatch_occupancy_event(false) + dispatch_action(Constants::PUSH_SUBSYSTEM_DOWN) elsif are_publishers_available? && !@publisher_available.value @publisher_available.make_true - dispatch_occupancy_event(true) + dispatch_action(Constants::PUSH_SUBSYSTEM_READY) end end def update_publishers(channel, publishers) - @publishers_pri.value = publishers if channel == SplitIoClient::Constants::CONTROL_PRI - @publishers_sec.value = publishers if channel == SplitIoClient::Constants::CONTROL_SEC + @publishers_pri.value = publishers if channel == Constants::CONTROL_PRI + @publishers_sec.value = publishers if channel == Constants::CONTROL_SEC end def are_publishers_available? @publishers_pri.value.positive? || @publishers_sec.value.positive? end - def dispatch_occupancy_event(push_enable) - @config.logger.debug("Dispatching occupancy event with publisher avaliable: #{push_enable}") - @on[:occupancy].call(push_enable) - end - - def dispatch_push_shutdown - @config.logger.debug('Dispatching push shutdown') - @on[:push_shutdown].call + def dispatch_action(action) + @config.logger.debug("Dispatching action: #{action}") + @on[:action].call(action) end end end diff --git a/lib/splitclient-rb/sse/sse_handler.rb b/lib/splitclient-rb/sse/sse_handler.rb index 6da566c7a..cd0a2953e 100644 --- a/lib/splitclient-rb/sse/sse_handler.rb +++ b/lib/splitclient-rb/sse/sse_handler.rb @@ -13,11 +13,10 @@ def initialize(config, synchronizer, splits_repository, segments_repository, not @notification_processor = SplitIoClient::SSE::NotificationProcessor.new(config, @splits_worker, @segments_worker) @sse_client = SSE::EventSource::Client.new(@config) do |client| client.on_event { |event| handle_incoming_message(event) } - client.on_connected { process_connected } - client.on_disconnect { |reconnect| process_disconnect(reconnect) } + client.on_action { |action| process_action(action) } end - @on = { connected: ->(_) {}, disconnect: ->(_) {} } + @on = { action: ->(_) {} } yield self if block_given? end @@ -48,22 +47,14 @@ def stop_workers @segments_worker.stop end - def on_connected(&action) - @on[:connected] = action - end - - def on_disconnect(&action) - @on[:disconnect] = action - end - - def process_disconnect(reconnect) - @on[:disconnect].call(reconnect) + def on_action(&action) + @on[:action] = action end private - def process_connected - @on[:connected].call + def process_action(action) + @on[:action].call(action) end def handle_incoming_message(notification) diff --git a/spec/engine/push_manager_spec.rb b/spec/engine/push_manager_spec.rb index a5ad835c4..4231353ca 100644 --- a/spec/engine/push_manager_spec.rb +++ b/spec/engine/push_manager_spec.rb @@ -42,8 +42,7 @@ stub_request(:get, config.auth_service_url).to_return(status: 200, body: body_response) config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -51,8 +50,7 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) @@ -63,16 +61,14 @@ sleep(1.5) expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) end end it 'must not connect to server. Auth server return 500' do stub_request(:get, config.auth_service_url).to_return(status: 500) - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -80,8 +76,7 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) @@ -93,15 +88,13 @@ expect(connected).to eq(false) expect(sse_handler.connected?).to eq(false) - expect(connected_event).to eq(false) - expect(disconnect_event).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_NONRETRYABLE_ERROR) end it 'must not connect to server. Auth server return 401' do stub_request(:get, config.auth_service_url).to_return(status: 401) - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -109,8 +102,7 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) @@ -122,8 +114,7 @@ expect(connected).to eq(false) expect(sse_handler.connected?).to eq(false) - expect(connected_event).to eq(false) - expect(disconnect_event).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_NONRETRYABLE_ERROR) end end @@ -137,8 +128,7 @@ stub_request(:get, config.auth_service_url).to_return(status: 200, body: body_response) config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = nil sse_handler = SplitIoClient::SSE::SSEHandler.new( config, synchronizer, @@ -146,8 +136,7 @@ segments_repository, notification_manager_keeper ) do |handler| - handler.on_connected { connected_event = true } - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end push_manager = subject.new(config, sse_handler, api_key) @@ -159,13 +148,11 @@ expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) push_manager.stop_sse expect(sse_handler.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end diff --git a/spec/engine/sync_manager_spec.rb b/spec/engine/sync_manager_spec.rb index 5a0df6c97..4c9a3e028 100644 --- a/spec/engine/sync_manager_spec.rb +++ b/spec/engine/sync_manager_spec.rb @@ -93,7 +93,7 @@ end end - it 'start sync manager receiving control message, must switch to pollingbundl' do + it 'start sync manager receiving control message, must switch to polling' do mock_server do |server| server.setup_response('/') do |_, res| send_content(res, event_control) diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 590e758f7..5afa193ea 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -23,12 +23,10 @@ send_stream_content(res, event_split_update) end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) @@ -41,13 +39,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -58,12 +54,10 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) @@ -78,13 +72,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -95,12 +87,10 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) @@ -114,13 +104,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -131,12 +119,10 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) @@ -149,13 +135,11 @@ expect(event_result.client_id).to eq('emptyClientId') expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -166,12 +150,10 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) @@ -180,13 +162,11 @@ sleep 0.5 expect(event_queue.empty?).to be_truthy expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -197,12 +177,10 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) @@ -214,13 +192,11 @@ expect(event_result.client_id).to eq(nil) expect(event_result.event_type).to eq('message') expect(sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) sse_client.close expect(sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -231,19 +207,16 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) expect(connected).to eq(false) expect(sse_client.connected?).to eq(false) - expect(connected_event).to eq(false) expect(event_queue.empty?).to eq(true) end end @@ -255,12 +228,10 @@ end event_queue = Queue.new - connected_event = false - disconnect_event = false + action_event = '' sse_client = subject.new(config) do |client| client.on_event { |event| event_queue << event } - client.on_connected { connected_event = true } - client.on_disconnect { disconnect_event = true } + client.on_action { |action| action_event = action } end connected = sse_client.start(server.base_uri) diff --git a/spec/sse/notification_manager_keeper_spec.rb b/spec/sse/notification_manager_keeper_spec.rb index 8c95b4924..fa86e7076 100644 --- a/spec/sse/notification_manager_keeper_spec.rb +++ b/spec/sse/notification_manager_keeper_spec.rb @@ -10,184 +10,174 @@ context 'CONTROL EVENT' do it 'STREAMING_PAUSED' do - result = nil - shutdown = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end + data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_PAUSED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) - expect(shutdown).to eq(nil) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) end it 'STREAMING_RESUMED with publishers enabled' do - result = nil - shutdown = nil + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end + data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_RESUMED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) - expect(shutdown).to eq(nil) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) end it 'STREAMING_RESUMED without publishers enabled' do - result = nil - shutdown = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) + + action_event = nil data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_RESUMED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') - result = nil noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) - expect(shutdown).to eq(nil) + expect(action_event).to eq(nil) end it 'STREAMING_DISABLED' do - result = nil - shutdown = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } - manager.on_push_shutdown { shutdown = true } + manager.on_action { |action| action_event = action } end + data = { 'type' => 'CONTROL', 'controlType' => 'STREAMING_DISABLED' } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) - expect(shutdown).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_OFF) end end context 'OCCUPANCY EVENT' do it 'first time without publishers available' do - result = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } + manager.on_action { |action| action_event = action } end + data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) end it 'first time with publishers available' do - result = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } + manager.on_action { |action| action_event = action } end + data = { 'metrics' => { 'publishers' => 2 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) end it 'handle many events' do - result = nil - + action_event = nil noti_manager_keeper = subject.new(config) do |manager| - manager.on_occupancy { |push_enable| result = push_enable } + manager.on_action { |action| action_event = action } end + data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 1 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-clienrubot-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 2 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 5 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 1 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(true) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_READY) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 2 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 3 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(false) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_SUBSYSTEM_DOWN) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) - result = nil + action_event = nil data = { 'metrics' => { 'publishers' => 0 } } event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) - expect(result).to eq(nil) + expect(action_event).to eq(nil) end end end diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 11c7c40b0..798fa7918 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -64,30 +64,24 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -100,30 +94,24 @@ splits_repository.set_change_number(1_506_703_262_916) config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end @@ -136,34 +124,28 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) split = splits_repository.get_split('FACUNDO_TEST') expect(split[:killed]).to be_truthy expect(split[:defaultTreatment]).to eq('on') expect(split[:changeNumber]).to eq(1_506_703_262_918) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -174,34 +156,28 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) split = splits_repository.get_split('FACUNDO_TEST') expect(split[:killed]).to be_truthy expect(split[:defaultTreatment]).to eq('on') expect(split[:changeNumber]).to eq(1_506_703_262_916) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end @@ -214,30 +190,24 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(1) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end @@ -248,30 +218,24 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.once - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end @@ -284,29 +248,23 @@ end config.streaming_service_url = server.base_uri - connected_event = false - disconnect_event = false + action_event = '' sse_handler = subject.new(config, synchronizer, splits_repository, segments_repository, notification_manager_keeper) do |handler| - handler.on_connected do - sse_handler.start_workers - connected_event = true - end - handler.on_disconnect { disconnect_event = true } + handler.on_action { |action| action_event = action } end + sse_handler.start_workers connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) sleep(2) + expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) - expect(connected_event).to eq(true) - expect(disconnect_event).to eq(false) sse_handler.sse_client.close expect(sse_handler.sse_client.connected?).to eq(false) - expect(disconnect_event).to eq(true) end end end From ea103a8f7c71532154cbfb410ba0a63dab636078 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 20 Jan 2021 14:46:07 -0300 Subject: [PATCH 05/10] polishing --- lib/splitclient-rb/sse/event_source/client.rb | 5 +++++ lib/splitclient-rb/version.rb | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index a0bcf2732..abdf484b2 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -43,6 +43,11 @@ def close(action = Constants::PUSH_NONRETRYABLE_ERROR) end def start(url) + if connected? + @config.logger.debug('SSEClient already running.') + return true + end + @uri = URI(url) latch = Concurrent::CountDownLatch.new(1) diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index a4b550053..ebe637308 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '7.2.2' + VERSION = '7.2.3.pre.rc1' end From b33144698c956b86db5b8c3a03e7f87338112721 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Fri, 22 Jan 2021 13:37:18 -0300 Subject: [PATCH 06/10] removed empty space --- lib/splitclient-rb/constants.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/splitclient-rb/constants.rb b/lib/splitclient-rb/constants.rb index 3ad047b5c..c01d8fc8f 100644 --- a/lib/splitclient-rb/constants.rb +++ b/lib/splitclient-rb/constants.rb @@ -13,4 +13,3 @@ class SplitIoClient::Constants PUSH_SUBSYSTEM_READY = 'PUSH_SUBSYSTEM_READY' PUSH_SUBSYSTEM_OFF = 'PUSH_SUBSYSTEM_OFF' end - \ No newline at end of file From 8fc5557dd86da536f5c1e7af8fce07520fa54fe9 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 17 Feb 2021 10:45:46 -0300 Subject: [PATCH 07/10] fix and tests --- .../cache/fetchers/segment_fetcher.rb | 12 +++- .../cache/fetchers/split_fetcher.rb | 4 +- lib/splitclient-rb/engine/synchronizer.rb | 17 ++---- .../sse/workers/segments_worker.rb | 7 ++- .../sse/workers/splits_worker.rb | 9 ++- spec/engine/synchronizer_spec.rb | 22 ++------ spec/sse/sse_handler_spec.rb | 4 +- spec/sse/workers/segments_worker_spec.rb | 21 +++++++ spec/sse/workers/splits_worker_spec.rb | 55 ++++++++++++++++--- 9 files changed, 100 insertions(+), 51 deletions(-) diff --git a/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb b/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb index 6cd7b2952..19dab74ae 100644 --- a/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb +++ b/lib/splitclient-rb/cache/fetchers/segment_fetcher.rb @@ -27,14 +27,22 @@ def call end end + def fetch_segments_if_not_exists(names) + names.each do |name| + change_number = @segments_repository.get_change_number(name) + + fetch_segment(name) if change_number == -1 + end + rescue StandardError => error + @config.log_found_exception(__method__.to_s, error) + end + def fetch_segment(name) @semaphore.synchronize do segments_api.fetch_segments_by_names([name]) - true end rescue StandardError => error @config.log_found_exception(__method__.to_s, error) - false end def fetch_segments diff --git a/lib/splitclient-rb/cache/fetchers/split_fetcher.rb b/lib/splitclient-rb/cache/fetchers/split_fetcher.rb index 104a839c8..8510ae438 100644 --- a/lib/splitclient-rb/cache/fetchers/split_fetcher.rb +++ b/lib/splitclient-rb/cache/fetchers/split_fetcher.rb @@ -41,11 +41,11 @@ def fetch_splits @config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled @sdk_blocker.splits_ready! - true + + data[:segment_names] end rescue StandardError => error @config.log_found_exception(__method__.to_s, error) - false end def stop_splits_thread diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index baf4f9b76..b9ea30e6e 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -30,7 +30,7 @@ def initialize( def sync_all @config.threads[:sync_all_thread] = Thread.new do @config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled - fetch_splits + @split_fetcher.fetch_splits fetch_segments end end @@ -53,21 +53,12 @@ def stop_periodic_fetch end def fetch_splits - back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1) - loop do - break if @split_fetcher.fetch_splits - - sleep(back_off.interval) - end + segment_names = @split_fetcher.fetch_splits + @segment_fetcher.fetch_segments_if_not_exists(segment_names) unless segment_names.empty? end def fetch_segment(name) - back_off = SplitIoClient::SSE::EventSource::BackOff.new(SplitIoClient::Constants::FETCH_BACK_OFF_BASE_RETRIES, 1) - loop do - break if @segment_fetcher.fetch_segment(name) - - sleep(back_off.interval) - end + @segment_fetcher.fetch_segment(name) end private diff --git a/lib/splitclient-rb/sse/workers/segments_worker.rb b/lib/splitclient-rb/sse/workers/segments_worker.rb index 3b364c7b0..361081194 100644 --- a/lib/splitclient-rb/sse/workers/segments_worker.rb +++ b/lib/splitclient-rb/sse/workers/segments_worker.rb @@ -49,11 +49,12 @@ def perform while (item = @queue.pop) segment_name = item[:segment_name] change_number = item[:change_number] - since = @segments_repository.get_change_number(segment_name) + @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{change_number}") - unless since >= change_number - @config.logger.debug("SegmentsWorker fetch_segment with #{since}") + attempt = 0 + while change_number > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_segment(segment_name) + attempt += 1 end end end diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index a347c8ff7..ba4fd5273 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -3,6 +3,8 @@ module SplitIoClient module SSE module Workers + MAX_RETRIES_ALLOWED = 10 + class SplitsWorker def initialize(synchronizer, config, splits_repository) @synchronizer = synchronizer @@ -59,11 +61,12 @@ def kill_split(change_number, split_name, default_treatment) def perform while (change_number = @queue.pop) - since = @splits_repository.get_change_number + @config.logger.debug("SplitsWorker change_number dequeue #{change_number}") - unless since.to_i >= change_number - @config.logger.debug("SplitsWorker fetch_splits with #{since}") + attempt = 0 + while change_number > @splits_repository.get_change_number.to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_splits + attempt += 1 end end end diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index 0958d77bb..c3354e308 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -87,34 +87,22 @@ it 'fetch_splits' do mock_split_changes(splits) + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') synchronizer.fetch_splits expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once end - it 'fetch_splits with retries' do - stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') - .to_return({ status: 500 }, { status: 200, body: splits }) - - synchronizer.fetch_splits - - expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.times(2) - end - it 'fetch_segment' do mock_segment_changes('segment3', segment3, '-1') synchronizer.fetch_segment('segment3') expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.once end - - it 'fetch_segment with retries' do - stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1') - .to_return({ status: 500 }, { status: 200, body: segment3 }) - - synchronizer.fetch_segment('segment3') - expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment3?since=-1')).to have_been_made.times(2) - end end private diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 798fa7918..75ab80f70 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -183,7 +183,7 @@ end context 'SEGMENT UPDATE event' do - it 'must trigger fetch' do + it 'must trigger fetch - with retries' do mock_server do |server| server.setup_response('/') do |_, res| send_content(res, event_segment_update_must_fetch) @@ -203,7 +203,7 @@ expect(action_event).to eq(SplitIoClient::Constants::PUSH_CONNECTED) expect(sse_handler.sse_client.connected?).to eq(true) - expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(1) + expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12) sse_handler.sse_client.close diff --git a/spec/sse/workers/segments_worker_spec.rb b/spec/sse/workers/segments_worker_spec.rb index 85aaecd24..131d731df 100644 --- a/spec/sse/workers/segments_worker_spec.rb +++ b/spec/sse/workers/segments_worker_spec.rb @@ -41,6 +41,16 @@ context 'add segment name to queue' do it 'must trigger fetch' do + stub_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877') + .to_return(status: 200, body: + '{ + "name": "segment1", + "added": [], + "removed": [], + "since": 1470947453878, + "till": 1470947453878 + }') + worker = subject.new(synchronizer, config, segments_repository) worker.start @@ -51,6 +61,17 @@ expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(2) end + it 'must trigger fetch - with retries' do + worker = subject.new(synchronizer, config, segments_repository) + + worker.start + worker.add_to_queue(1_506_703_262_918, 'segment1') + + sleep(1) + + expect(a_request(:get, 'https://sdk.split.io/api/segmentChanges/segment1?since=1470947453877')).to have_been_made.times(12) + end + it 'must not trigger fetch' do worker = subject.new(synchronizer, config, segments_repository) diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 2ecfddeaf..2bc4fb397 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -27,18 +27,44 @@ let(:params) { { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: impression_counter } } let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, sdk_blocker, params) } - before do - mock_split_changes(splits) - mock_segment_changes('segment1', segment1, '-1') - mock_segment_changes('segment1', segment1, '1470947453877') - mock_segment_changes('segment2', segment2, '-1') - mock_segment_changes('segment2', segment2, '1470947453878') - mock_segment_changes('segment3', segment3, '-1') - - split_fetcher.fetch_splits + it 'add change number - must tigger fetcch - with retries' do + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1') + .to_return(status: 200, body: + '{ + "splits": [], + "since": -1, + "till": 1506703262918 + }') + + stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918') + .to_return(status: 200, body: + '{ + "splits": [], + "since": 1506703262918, + "till": 1506703262918 + }') + + worker = subject.new(synchronizer, config, splits_repository) + worker.start + worker.add_to_queue(1_506_703_262_919) + + sleep(1) + + expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262918')).to have_been_made.times(10) end context 'add change number to queue' do + before do + mock_split_changes(splits) + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + + split_fetcher.fetch_splits + end + it 'must trigger fetch' do worker = subject.new(synchronizer, config, splits_repository) worker.start @@ -71,6 +97,17 @@ end context 'kill split notification' do + before do + mock_split_changes(splits) + mock_segment_changes('segment1', segment1, '-1') + mock_segment_changes('segment1', segment1, '1470947453877') + mock_segment_changes('segment2', segment2, '-1') + mock_segment_changes('segment2', segment2, '1470947453878') + mock_segment_changes('segment3', segment3, '-1') + + split_fetcher.fetch_splits + end + it 'must kill split and trigger fetch' do worker = subject.new(synchronizer, config, splits_repository) From 72e12b00c45299b974a31605bca0819bd9ad757a Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 17 Feb 2021 11:01:11 -0300 Subject: [PATCH 08/10] fix build --- lib/splitclient-rb/sse/workers/segments_worker.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/splitclient-rb/sse/workers/segments_worker.rb b/lib/splitclient-rb/sse/workers/segments_worker.rb index 361081194..6d5768e85 100644 --- a/lib/splitclient-rb/sse/workers/segments_worker.rb +++ b/lib/splitclient-rb/sse/workers/segments_worker.rb @@ -48,11 +48,11 @@ def stop def perform while (item = @queue.pop) segment_name = item[:segment_name] - change_number = item[:change_number] - @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{change_number}") + cn = item[:change_number] + @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") attempt = 0 - while change_number > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED + while cn > @segments_repository.get_change_number(segment_name).to_i && attempt <= Workers::MAX_RETRIES_ALLOWED @synchronizer.fetch_segment(segment_name) attempt += 1 end From 657d1cc059e9f6693476bb9cd4edb7a1d51452a1 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 24 Feb 2021 17:24:33 -0300 Subject: [PATCH 09/10] 7.2.3 release --- CHANGES.txt | 6 ++++++ lib/splitclient-rb/version.rb | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index aab4cd97c..43f419f0d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,11 @@ CHANGES +7.2.3 (Feb 24, 2021) +- Fixed missing segment fetch after an SPLIT_UPDATE. +- Updated occupancy logic to process secondary region. +- Updated sse client connection confirmation. +- Updated naming retryable erros. + 7.2.2 (Dec 18, 2020) - Fixed issue: undefined local variable or method post_impressions_count diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index ebe637308..e446988e9 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '7.2.3.pre.rc1' + VERSION = '7.2.3' end From db2316296bb13d8418d2dfcf95737c5989925b81 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 24 Feb 2021 17:30:19 -0300 Subject: [PATCH 10/10] pr feedback --- CHANGES.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 43f419f0d..cc86e6051 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,9 +2,9 @@ CHANGES 7.2.3 (Feb 24, 2021) - Fixed missing segment fetch after an SPLIT_UPDATE. -- Updated occupancy logic to process secondary region. -- Updated sse client connection confirmation. -- Updated naming retryable erros. +- Updated streaming logic to support multiregion. +- Updated sse client connection logic to read confirmation event. +- Updated naming of retryable erros. 7.2.2 (Dec 18, 2020) - Fixed issue: undefined local variable or method post_impressions_count