From 63cd8896fe3f9923461f6a7e1cef508dc62e30f5 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Fri, 15 Jan 2021 10:47:37 -0300 Subject: [PATCH 1/2] sec region support --- lib/splitclient-rb/sse/event_source/client.rb | 2 +- .../sse/notification_manager_keeper.rb | 36 +++++++++----- spec/sse/notification_manager_keeper_spec.rb | 48 +++++++++++++++++++ 3 files changed, 74 insertions(+), 12 deletions(-) diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 920aef4a3..3478c6ee9 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -74,7 +74,7 @@ def connect_thread(latch) def connect_stream(latch) socket_write(latch) - while @connected.value + while connected? begin partial_data = @socket.readpartial(10_000, timeout: @read_timeout) diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index c014c603c..449de54af 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -1,13 +1,15 @@ # frozen_string_literal: true -require 'concurrent/atomics' +require 'concurrent' module SplitIoClient module SSE class NotificationManagerKeeper def initialize(config) @config = config - @publisher_available = Concurrent::AtomicBoolean.new(true) + @streaming_available = Concurrent::AtomicBoolean.new(true) + @publishers_pri = Concurrent::AtomicFixnum.new + @publishers_sec = Concurrent::AtomicFixnum.new @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } yield self if block_given? @@ -16,8 +18,8 @@ def initialize(config) def handle_incoming_occupancy_event(event) if event.data['type'] == 'CONTROL' process_event_control(event.data['controlType']) - elsif event.channel == SplitIoClient::Constants::CONTROL_PRI - process_event_occupancy(event.data['metrics']['publishers']) + else + process_event_occupancy(event.channel, event.data['metrics']['publishers']) end rescue StandardError => e @config.logger.error(e) @@ -38,7 +40,7 @@ def process_event_control(type) when 'STREAMING_PAUSED' dispatch_occupancy_event(false) when 'STREAMING_RESUMED' - dispatch_occupancy_event(true) if @publisher_available.value + dispatch_occupancy_event(true) if @streaming_available.value when 'STREAMING_DISABLED' dispatch_push_shutdown else @@ -46,17 +48,29 @@ def process_event_control(type) end end - def process_event_occupancy(publishers) - @config.logger.debug("Occupancy process event with #{publishers} publishers") if @config.debug_enabled - if publishers <= 0 && @publisher_available.value - @publisher_available.make_false + def process_event_occupancy(channel, publishers) + @config.logger.debug("Occupancy process event with #{publishers} publishers. Channel: #{channel}") + + update_publishers(channel, publishers) + + if !are_publishers_avaliable? && @streaming_available.value + @streaming_available.make_false dispatch_occupancy_event(false) - elsif publishers >= 1 && !@publisher_available.value - @publisher_available.make_true + elsif are_publishers_avaliable? && !@streaming_available.value + @streaming_available.make_true dispatch_occupancy_event(true) end end + def update_publishers(channel, publishers) + @publishers_pri.compare_and_set(@publishers_pri.value, publishers) if channel == SplitIoClient::Constants::CONTROL_PRI + @publishers_sec.compare_and_set(@publishers_sec.value, publishers) if channel == SplitIoClient::Constants::CONTROL_SEC + end + + def are_publishers_avaliable? + @publishers_pri.value.positive? || @publishers_sec.value.positive? + end + def dispatch_occupancy_event(push_enable) @config.logger.debug("Dispatching occupancy event with publisher avaliable: #{push_enable}") @on[:occupancy].call(push_enable) diff --git a/spec/sse/notification_manager_keeper_spec.rb b/spec/sse/notification_manager_keeper_spec.rb index abc4da910..8c95b4924 100644 --- a/spec/sse/notification_manager_keeper_spec.rb +++ b/spec/sse/notification_manager_keeper_spec.rb @@ -140,6 +140,54 @@ event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) expect(result).to eq(true) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(false) + + result = nil + data = { 'metrics' => { 'publishers' => 1 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(true) + + result = nil + data = { 'metrics' => { 'publishers' => 2 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 3 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(false) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) end end end From e1c82653482a15e371503f86e8e43b5fb07e0882 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Fri, 15 Jan 2021 20:09:34 -0300 Subject: [PATCH 2/2] pr feedback --- .../sse/notification_manager_keeper.rb | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index 449de54af..b90a4dc7a 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -7,7 +7,7 @@ module SSE class NotificationManagerKeeper def initialize(config) @config = config - @streaming_available = Concurrent::AtomicBoolean.new(true) + @publisher_available = Concurrent::AtomicBoolean.new(true) @publishers_pri = Concurrent::AtomicFixnum.new @publishers_sec = Concurrent::AtomicFixnum.new @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } @@ -40,7 +40,7 @@ def process_event_control(type) when 'STREAMING_PAUSED' dispatch_occupancy_event(false) when 'STREAMING_RESUMED' - dispatch_occupancy_event(true) if @streaming_available.value + dispatch_occupancy_event(true) if @publisher_available.value when 'STREAMING_DISABLED' dispatch_push_shutdown else @@ -49,25 +49,25 @@ def process_event_control(type) end def process_event_occupancy(channel, publishers) - @config.logger.debug("Occupancy process event with #{publishers} publishers. Channel: #{channel}") + @config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") update_publishers(channel, publishers) - if !are_publishers_avaliable? && @streaming_available.value - @streaming_available.make_false + if !are_publishers_available? && @publisher_available.value + @publisher_available.make_false dispatch_occupancy_event(false) - elsif are_publishers_avaliable? && !@streaming_available.value - @streaming_available.make_true + elsif are_publishers_available? && !@publisher_available.value + @publisher_available.make_true dispatch_occupancy_event(true) end end def update_publishers(channel, publishers) - @publishers_pri.compare_and_set(@publishers_pri.value, publishers) if channel == SplitIoClient::Constants::CONTROL_PRI - @publishers_sec.compare_and_set(@publishers_sec.value, publishers) if channel == SplitIoClient::Constants::CONTROL_SEC + @publishers_pri.value = publishers if channel == SplitIoClient::Constants::CONTROL_PRI + @publishers_sec.value = publishers if channel == SplitIoClient::Constants::CONTROL_SEC end - def are_publishers_avaliable? + def are_publishers_available? @publishers_pri.value.positive? || @publishers_sec.value.positive? end