Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
CHANGES

7.3.2 (Dec 10, 2021)
- Updated the readiness flow to be more consistent with the other sdks and improve the readiness time.
- Updated the name of telemety key latencies in Redis.

7.3.1 (Jul 26, 2021)
- Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced.

Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
require 'splitclient-rb/cache/senders/events_sender'
require 'splitclient-rb/cache/senders/impressions_count_sender'
require 'splitclient-rb/cache/senders/localhost_repo_cleaner'
require 'splitclient-rb/cache/stores/store_utils'
require 'splitclient-rb/cache/stores/localhost_split_builder'
require 'splitclient-rb/cache/stores/sdk_blocker'
require 'splitclient-rb/cache/stores/localhost_split_store'
require 'splitclient-rb/cache/stores/store_utils'

require 'splitclient-rb/clients/split_client'
require 'splitclient-rb/managers/split_manager'
Expand Down Expand Up @@ -87,6 +86,7 @@
require 'splitclient-rb/engine/auth_api_client'
require 'splitclient-rb/engine/back_off'
require 'splitclient-rb/engine/push_manager'
require 'splitclient-rb/engine/status_manager'
require 'splitclient-rb/engine/sync_manager'
require 'splitclient-rb/engine/synchronizer'
require 'splitclient-rb/utilitites'
Expand Down
12 changes: 3 additions & 9 deletions lib/splitclient-rb/cache/fetchers/segment_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ module Fetchers
class SegmentFetcher
attr_reader :segments_repository

def initialize(segments_repository, api_key, config, sdk_blocker, telemetry_runtime_producer)
def initialize(segments_repository, api_key, config, telemetry_runtime_producer)
@segments_repository = segments_repository
@api_key = api_key
@config = config
@sdk_blocker = sdk_blocker
@semaphore = Mutex.new
@telemetry_runtime_producer = telemetry_runtime_producer
end
Expand Down Expand Up @@ -52,11 +51,11 @@ def fetch_segments
@semaphore.synchronize do
segments_api.fetch_segments_by_names(@segments_repository.used_segment_names)

@sdk_blocker.segments_ready!
@sdk_blocker.sdk_internal_ready
true
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
false
end

def stop_segments_thread
Expand All @@ -70,11 +69,6 @@ def segments_thread
@config.logger.info('Starting segments fetcher service') if @config.debug_enabled

loop do
unless @sdk_blocker.splits_repository.ready?
sleep 0.2
next
end

fetch_segments
@config.logger.debug("Segment names: #{@segments_repository.used_segment_names.to_a}") if @config.debug_enabled

Expand Down
9 changes: 3 additions & 6 deletions lib/splitclient-rb/cache/fetchers/split_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ module Fetchers
class SplitFetcher
attr_reader :splits_repository

def initialize(splits_repository, api_key, config, sdk_blocker, telemetry_runtime_producer)
def initialize(splits_repository, api_key, config, telemetry_runtime_producer)
@splits_repository = splits_repository
@api_key = api_key
@config = config
@sdk_blocker = sdk_blocker
@semaphore = Mutex.new
@telemetry_runtime_producer = telemetry_runtime_producer
end
Expand Down Expand Up @@ -40,13 +39,11 @@ def fetch_splits(fetch_options = { cache_control_headers: false, till: nil })

@config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled

@sdk_blocker.splits_ready!

data[:segment_names]
{ segment_names: data[:segment_names], success: true }
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
[]
{ segment_names: [], success: false }
end

def stop_splits_thread
Expand Down
9 changes: 3 additions & 6 deletions lib/splitclient-rb/cache/stores/localhost_split_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ class LocalhostSplitStore
require 'yaml'
attr_reader :splits_repository

def initialize(splits_repository, config, sdk_blocker = nil)
def initialize(splits_repository, config, status_manager = nil)
@splits_repository = splits_repository
@config = config
@sdk_blocker = sdk_blocker
@status_manager = status_manager
end

def call
Expand Down Expand Up @@ -45,10 +45,7 @@ def store_splits
store_split(split)
end

if @sdk_blocker
@sdk_blocker.splits_ready!
@sdk_blocker.segments_ready!
end
@status_manager.ready! if @status_manager
rescue StandardError => error
@config.logger.error('Error while parsing the split file. ' \
'Check that the input file matches the expected format')
Expand Down
64 changes: 0 additions & 64 deletions lib/splitclient-rb/cache/stores/sdk_blocker.rb

This file was deleted.

10 changes: 5 additions & 5 deletions lib/splitclient-rb/clients/split_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class SplitClient
# @param api_key [String] the API key for your split account
#
# @return [SplitIoClient] split.io client instance
def initialize(api_key, repositories, sdk_blocker, config, impressions_manager, telemetry_evaluation_producer)
def initialize(api_key, repositories, status_manager, config, impressions_manager, telemetry_evaluation_producer)
@api_key = api_key
@splits_repository = repositories[:splits]
@segments_repository = repositories[:segments]
@impressions_repository = repositories[:impressions]
@events_repository = repositories[:events]
@sdk_blocker = sdk_blocker
@status_manager = status_manager
@destroyed = false
@config = config
@impressions_manager = impressions_manager
Expand Down Expand Up @@ -137,7 +137,7 @@ def parsed_treatment(multiple, treatment_data)
else
{
treatment: treatment_data[:treatment],
config: treatment_data[:config]
config: treatment_data[:config],
}
end
end
Expand All @@ -157,7 +157,7 @@ def sanitize_split_names(calling_method, split_names)
end

def block_until_ready(time = nil)
@sdk_blocker.block(time) if @sdk_blocker && !@sdk_blocker.ready?
@status_manager.wait_until_ready(time) if @status_manager
end

private
Expand Down Expand Up @@ -310,7 +310,7 @@ def variable_size(value)
end

def ready?
return @sdk_blocker.ready? if @sdk_blocker
return @status_manager.ready? if @status_manager
true
end

Expand Down
8 changes: 4 additions & 4 deletions lib/splitclient-rb/engine/common/impressions_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def build_impression(matching_key, bucketing_key, split_name, treatment, params
@impression_counter.inc(split_name, impression_data[:m]) if optimized? && !redis?

impression(impression_data, params[:attributes])
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

def track(impressions)
Expand All @@ -48,8 +48,8 @@ def track(impressions)
end

record_stats(queued, dropped, dedupe)
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

private
Expand Down
33 changes: 33 additions & 0 deletions lib/splitclient-rb/engine/status_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module SplitIoClient
module Engine
class StatusManager
def initialize(config)
@config = config
@sdk_ready = Concurrent::CountDownLatch.new(1)
end

def ready?
return true if @config.consumer?

@sdk_ready.wait(0)
end

def ready!
return if ready?

@sdk_ready.count_down
@config.logger.info('SplitIO SDK is ready')
end

def wait_until_ready(seconds = nil)
return if @config.consumer?

timeout = seconds || @config.block_until_ready

raise SDKBlockerTimeoutExpiredException, 'SDK start up timeout expired' unless @sdk_ready.wait(timeout)
end
end
end
end
75 changes: 27 additions & 48 deletions lib/splitclient-rb/engine/sync_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def initialize(
config,
synchronizer,
telemetry_runtime_producer,
sdk_blocker,
telemetry_synchronizer
telemetry_synchronizer,
status_manager
)
@synchronizer = synchronizer
notification_manager_keeper = SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer) do |manager|
Expand All @@ -33,55 +33,41 @@ def initialize(
@sse_connected = Concurrent::AtomicBoolean.new(false)
@config = config
@telemetry_runtime_producer = telemetry_runtime_producer
@sdk_blocker = sdk_blocker
@telemetry_synchronizer = telemetry_synchronizer
@status_manager = status_manager
end

def start
if @config.streaming_enabled
start_stream
start_stream_forked if defined?(PhusionPassenger)
elsif @config.standalone?
start_poll
end

synchronize_telemetry_config
start_thread
PhusionPassenger.on_event(:starting_worker_process) { |forked| start_thread if forked } if defined?(PhusionPassenger)
end

private

# Starts tasks if stream is enabled.
def start_stream
@config.logger.debug('Starting push mode ...')
@synchronizer.sync_all
@synchronizer.start_periodic_data_recording

start_sse_connection_thread
end
def start_thread
@config.threads[:start_sdk] = Thread.new do
sleep(0.5) until @synchronizer.sync_all(false)

def start_poll
@config.logger.debug('Starting polling mode ...')
@synchronizer.start_periodic_fetch
@synchronizer.start_periodic_data_recording
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
rescue StandardError => e
@config.logger.error("start_poll error : #{e.inspect}")
end
@status_manager.ready!
@telemetry_synchronizer.synchronize_config
@synchronizer.start_periodic_data_recording
connected = false

# Starts thread which connect to sse and after that fetch splits and segments once.
def start_sse_connection_thread
@config.threads[:sync_manager_start_sse] = Thread.new do
begin
if @config.streaming_enabled
@config.logger.debug('Starting Straming mode ...')
connected = @push_manager.start_sse
@synchronizer.start_periodic_fetch unless connected
rescue StandardError => e
@config.logger.error("start_sse_connection_thread error : #{e.inspect}")

if defined?(PhusionPassenger)
PhusionPassenger.on_event(:starting_worker_process) { |forked| sse_thread_forked if forked }
end
end
end
end

def start_stream_forked
PhusionPassenger.on_event(:starting_worker_process) { |forked| start_stream if forked }
unless connected
@config.logger.debug('Starting polling mode ...')
@synchronizer.start_periodic_fetch
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
end
end
end

def process_action(action)
Expand Down Expand Up @@ -165,16 +151,9 @@ def record_telemetry(type, data)
@telemetry_runtime_producer.record_streaming_event(type, data)
end

def synchronize_telemetry_config
@config.threads[:telemetry_config_sender] = Thread.new do
begin
@sdk_blocker.wait_unitil_internal_ready unless @config.consumer?
@telemetry_synchronizer.synchronize_config
rescue SplitIoClient::SDKShutdownException
@telemetry_synchronizer.synchronize_config
@config.logger.info('Posting Telemetry config due to shutdown')
end
end
def sse_thread_forked
connected = @push_manager.start_sse
@synchronizer.start_periodic_fetch unless connected
end
end
end
Expand Down
Loading