Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/splitclient-rb/sse/event_source/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def connect_thread(latch)
def connect_stream(latch)
socket_write(latch)

while @connected.value
while connected?
begin
partial_data = @socket.readpartial(10_000, timeout: @read_timeout)

Expand Down
28 changes: 21 additions & 7 deletions lib/splitclient-rb/sse/notification_manager_keeper.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# frozen_string_literal: true

require 'concurrent/atomics'
require 'concurrent'

module SplitIoClient
module SSE
class NotificationManagerKeeper
def initialize(config)
@config = config
@publisher_available = Concurrent::AtomicBoolean.new(true)
@publishers_pri = Concurrent::AtomicFixnum.new
@publishers_sec = Concurrent::AtomicFixnum.new
@on = { occupancy: ->(_) {}, push_shutdown: ->(_) {} }

yield self if block_given?
Expand All @@ -16,8 +18,8 @@ def initialize(config)
def handle_incoming_occupancy_event(event)
if event.data['type'] == 'CONTROL'
process_event_control(event.data['controlType'])
elsif event.channel == SplitIoClient::Constants::CONTROL_PRI
process_event_occupancy(event.data['metrics']['publishers'])
else
process_event_occupancy(event.channel, event.data['metrics']['publishers'])
end
rescue StandardError => e
@config.logger.error(e)
Expand Down Expand Up @@ -46,17 +48,29 @@ def process_event_control(type)
end
end

def process_event_occupancy(publishers)
@config.logger.debug("Occupancy process event with #{publishers} publishers") if @config.debug_enabled
if publishers <= 0 && @publisher_available.value
def process_event_occupancy(channel, publishers)
@config.logger.debug("Processed occupancy event with #{publishers} publishers. Channel: #{channel}")

update_publishers(channel, publishers)

if !are_publishers_available? && @publisher_available.value
@publisher_available.make_false
dispatch_occupancy_event(false)
elsif publishers >= 1 && !@publisher_available.value
elsif are_publishers_available? && !@publisher_available.value
@publisher_available.make_true
dispatch_occupancy_event(true)
end
end

def update_publishers(channel, publishers)
@publishers_pri.value = publishers if channel == SplitIoClient::Constants::CONTROL_PRI
@publishers_sec.value = publishers if channel == SplitIoClient::Constants::CONTROL_SEC
end

def are_publishers_available?
@publishers_pri.value.positive? || @publishers_sec.value.positive?
end

def dispatch_occupancy_event(push_enable)
@config.logger.debug("Dispatching occupancy event with publisher avaliable: #{push_enable}")
@on[:occupancy].call(push_enable)
Expand Down
48 changes: 48 additions & 0 deletions spec/sse/notification_manager_keeper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,54 @@
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(true)

result = nil
data = { 'metrics' => { 'publishers' => 0 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(false)

result = nil
data = { 'metrics' => { 'publishers' => 1 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(true)

result = nil
data = { 'metrics' => { 'publishers' => 2 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(nil)

result = nil
data = { 'metrics' => { 'publishers' => 3 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(nil)

result = nil
data = { 'metrics' => { 'publishers' => 0 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(nil)

result = nil
data = { 'metrics' => { 'publishers' => 0 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(false)

result = nil
data = { 'metrics' => { 'publishers' => 0 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_sec')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(nil)

result = nil
data = { 'metrics' => { 'publishers' => 0 } }
event = SplitIoClient::SSE::EventSource::StreamData.new('message', 'test-client-id', data, 'control_pri')
noti_manager_keeper.handle_incoming_occupancy_event(event)
expect(result).to eq(nil)
end
end
end