Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
CHANGES

7.2.1 (Oct 23, 2020)
- Updated redis dependency to >= 4.2.2.
- Updated ably error handling.

7.2.0 (Sep 25, 2020)
- Added deduplication logic for impressions data.
- Now there are two modes for Impressions when the SDK is in standalone mode, OPTIMIZED (default) that only ships unique impressions and DEBUG for times where you need to send ALL impressions to debug an integration.
Expand Down
1 change: 1 addition & 0 deletions lib/splitclient-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
# SSE
require 'splitclient-rb/sse/event_source/back_off'
require 'splitclient-rb/sse/event_source/client'
require 'splitclient-rb/sse/event_source/event_parser'
require 'splitclient-rb/sse/event_source/event_types'
require 'splitclient-rb/sse/event_source/stream_data'
require 'splitclient-rb/sse/workers/segments_worker'
Expand Down
2 changes: 1 addition & 1 deletion lib/splitclient-rb/cache/adapters/redis_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def get_from_queue(key, count)

# General
def exists?(key)
@redis.exists(key)
@redis.exists?(key)
end

def delete(key)
Expand Down
12 changes: 7 additions & 5 deletions lib/splitclient-rb/engine/push_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ def start_sse
response = @auth_api_client.authenticate(@api_key)

@config.logger.debug("Auth service response push_enabled: #{response[:push_enabled]}") if @config.debug_enabled
if response[:push_enabled]
@sse_handler.start(response[:token], response[:channels])

if response[:push_enabled] && @sse_handler.start(response[:token], response[:channels])
schedule_next_token_refresh(response[:exp])
@back_off.reset
else
stop_sse
return
end

stop_sse

schedule_next_token_refresh(@back_off.interval) if response[:retry]
rescue StandardError => e
@config.logger.error("start_sse: #{e.inspect}")
Expand All @@ -31,6 +32,7 @@ def start_sse
def stop_sse
@sse_handler.process_disconnect if @sse_handler.sse_client.nil?
@sse_handler.stop
SplitIoClient::Helpers::ThreadHelper.stop(:schedule_next_token_refresh, @config)
end

private
Expand All @@ -41,7 +43,7 @@ def schedule_next_token_refresh(time)
@config.logger.debug("schedule_next_token_refresh refresh in #{time} seconds.") if @config.debug_enabled
sleep(time)
@config.logger.debug('schedule_next_token_refresh starting ...') if @config.debug_enabled
stop_sse
@sse_handler.stop
start_sse
rescue StandardError => e
@config.logger.debug("schedule_next_token_refresh error: #{e.inspect}") if @config.debug_enabled
Expand Down
46 changes: 33 additions & 13 deletions lib/splitclient-rb/engine/sync_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@
module SplitIoClient
module Engine
class SyncManager
include SplitIoClient::Cache::Fetchers

def initialize(
repositories,
api_key,
config,
params
synchronizer
)
split_fetcher = SplitFetcher.new(repositories[:splits], api_key, params[:metrics], config, params[:sdk_blocker])
segment_fetcher = SegmentFetcher.new(repositories[:segments], api_key, params[:metrics], config, params[:sdk_blocker])
sync_params = { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: params[:impression_counter] }

@synchronizer = Synchronizer.new(repositories, api_key, config, params[:sdk_blocker], sync_params)
@synchronizer = synchronizer
notification_manager_keeper = SplitIoClient::SSE::NotificationManagerKeeper.new(config) do |manager|
manager.on_occupancy { |publisher_available| process_occupancy(publisher_available) }
manager.on_push_shutdown { process_push_shutdown }
Expand All @@ -28,10 +22,11 @@ def initialize(
notification_manager_keeper
) do |handler|
handler.on_connected { process_connected }
handler.on_disconnect { process_disconnect }
handler.on_disconnect { |reconnect| process_disconnect(reconnect) }
end

@push_manager = PushManager.new(config, @sse_handler, api_key)
@sse_connected = Concurrent::AtomicBoolean.new(false)
@config = config
end

Expand Down Expand Up @@ -90,30 +85,55 @@ def start_stream_forked
end

def process_connected
if @sse_connected.value
@config.logger.debug('Streaming already connected.')
return
end

@sse_connected.make_true
@synchronizer.stop_periodic_fetch
@synchronizer.sync_all
@sse_handler.start_workers
rescue StandardError => e
@config.logger.error("process_connected error: #{e.inspect}")
end

def process_disconnect
def process_disconnect(reconnect)
unless @sse_connected.value
@config.logger.debug('Streaming already disconnected.')
return
end

@sse_connected.make_false
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch

if reconnect
@synchronizer.sync_all
@push_manager.start_sse
end
rescue StandardError => e
@config.logger.error("process_disconnect error: #{e.inspect}")
end

def process_occupancy(push_enable)
process_disconnect unless push_enable
process_connected if push_enable
if push_enable
@synchronizer.stop_periodic_fetch
@synchronizer.sync_all
@sse_handler.start_workers
return
end

@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
rescue StandardError => e
@config.logger.error("process_occupancy error: #{e.inspect}")
end

def process_push_shutdown
@push_manager.stop_sse
process_disconnect
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
rescue StandardError => e
@config.logger.error("process_push_shutdown error: #{e.inspect}")
end
Expand Down
8 changes: 5 additions & 3 deletions lib/splitclient-rb/engine/synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ def initialize(
end

def sync_all
@config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled
fetch_splits
fetch_segments
@config.threads[:sync_all_thread] = Thread.new do
@config.logger.debug('Synchronizing Splits and Segments ...') if @config.debug_enabled
fetch_splits
fetch_segments
end
end

def start_periodic_data_recording
Expand Down
24 changes: 14 additions & 10 deletions lib/splitclient-rb/split_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class SplitFactory
include SplitIoClient::Cache::Repositories
include SplitIoClient::Cache::Stores
include SplitIoClient::Cache::Senders
include SplitIoClient::Cache::Fetchers

attr_reader :adapter, :client, :manager, :config

Expand Down Expand Up @@ -53,8 +54,12 @@ def start!
if @config.localhost_mode
start_localhost_components
else
params = { sdk_blocker: @sdk_blocker, metrics: @metrics, impression_counter: @impression_counter }
SplitIoClient::Engine::SyncManager.new(repositories, @api_key, @config, params).start
split_fetcher = SplitFetcher.new(@splits_repository, @api_key, @metrics, config, @sdk_blocker)
segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, @metrics, config, @sdk_blocker)
params = { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: @impression_counter }

synchronizer = SplitIoClient::Engine::Synchronizer.new(repositories, @api_key, @config, @sdk_blocker, params)
SplitIoClient::Engine::SyncManager.new(repositories, @api_key, @config, synchronizer).start
end
end

Expand Down Expand Up @@ -125,14 +130,13 @@ def validate_api_key
end

def repositories
repos = {}
repos[:splits] = @splits_repository
repos[:segments] = @segments_repository
repos[:impressions] = @impressions_repository
repos[:events] = @events_repository
repos[:metrics] = @metrics_repository

repos
{
splits: @splits_repository,
segments: @segments_repository,
impressions: @impressions_repository,
events: @events_repository,
metrics: @metrics_repository
}
end

def start_localhost_components
Expand Down
Loading