diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 920aef4a3..3478c6ee9 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -74,7 +74,7 @@ def connect_thread(latch) def connect_stream(latch) socket_write(latch) - while @connected.value + while connected? begin partial_data = @socket.readpartial(10_000, timeout: @read_timeout) diff --git a/lib/splitclient-rb/sse/notification_manager_keeper.rb b/lib/splitclient-rb/sse/notification_manager_keeper.rb index c014c603c..b90a4dc7a 100644 --- a/lib/splitclient-rb/sse/notification_manager_keeper.rb +++ b/lib/splitclient-rb/sse/notification_manager_keeper.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require 'concurrent/atomics' +require 'concurrent' module SplitIoClient module SSE @@ -8,6 +8,8 @@ class NotificationManagerKeeper def initialize(config) @config = config @publisher_available = Concurrent::AtomicBoolean.new(true) + @publishers_pri = Concurrent::AtomicFixnum.new + @publishers_sec = Concurrent::AtomicFixnum.new @on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} } yield self if block_given? @@ -16,8 +18,8 @@ def initialize(config) def handle_incoming_occupancy_event(event) if event.data['type'] == 'CONTROL' process_event_control(event.data['controlType']) - elsif event.channel == SplitIoClient::Constants::CONTROL_PRI - process_event_occupancy(event.data['metrics']['publishers']) + else + process_event_occupancy(event.channel, event.data['metrics']['publishers']) end rescue StandardError => e @config.logger.error(e) @@ -46,17 +48,29 @@ def process_event_control(type) end end - def process_event_occupancy(publishers) - @config.logger.debug("Occupancy process event with #{publishers} publishers") if @config.debug_enabled - if publishers <= 0 && @publisher_available.value + def process_event_occupancy(channel, publishers) + @config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}") + + update_publishers(channel, publishers) + + if !are_publishers_available? && @publisher_available.value @publisher_available.make_false dispatch_occupancy_event(false) - elsif publishers >= 1 && !@publisher_available.value + elsif are_publishers_available? && !@publisher_available.value @publisher_available.make_true dispatch_occupancy_event(true) end end + def update_publishers(channel, publishers) + @publishers_pri.value = publishers if channel == SplitIoClient::Constants::CONTROL_PRI + @publishers_sec.value = publishers if channel == SplitIoClient::Constants::CONTROL_SEC + end + + def are_publishers_available? + @publishers_pri.value.positive? || @publishers_sec.value.positive? + end + def dispatch_occupancy_event(push_enable) @config.logger.debug("Dispatching occupancy event with publisher avaliable: #{push_enable}") @on[:occupancy].call(push_enable) diff --git a/spec/sse/notification_manager_keeper_spec.rb b/spec/sse/notification_manager_keeper_spec.rb index abc4da910..8c95b4924 100644 --- a/spec/sse/notification_manager_keeper_spec.rb +++ b/spec/sse/notification_manager_keeper_spec.rb @@ -140,6 +140,54 @@ event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') noti_manager_keeper.handle_incoming_occupancy_event(event) expect(result).to eq(true) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(false) + + result = nil + data = { 'metrics' => { 'publishers' => 1 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(true) + + result = nil + data = { 'metrics' => { 'publishers' => 2 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 3 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(false) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) + + result = nil + data = { 'metrics' => { 'publishers' => 0 } } + event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri') + noti_manager_keeper.handle_incoming_occupancy_event(event) + expect(result).to eq(nil) end end end