Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Metrics/ClassLength:
Metrics/CyclomaticComplexity:
Max: 8

Metrics/ParameterLists:
Exclude:
- lib/splitclient-rb/engine/sync_manager.rb

Metrics/LineLength:
Max: 130
Exclude:
Expand Down
1 change: 1 addition & 0 deletions lib/splitclient-rb/cache/fetchers/segment_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def fetch_segments
segments_api.fetch_segments_by_names(@segments_repository.used_segment_names)

@sdk_blocker.segments_ready!
@sdk_blocker.sdk_internal_ready
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
Expand Down
36 changes: 29 additions & 7 deletions lib/splitclient-rb/engine/sync_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ def initialize(
api_key,
config,
synchronizer,
telemetry_runtime_producer
telemetry_runtime_producer,
sdk_blocker,
telemetry_synchronizer
)
@synchronizer = synchronizer
notification_manager_keeper = SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer) do |manager|
Expand All @@ -31,6 +33,8 @@ def initialize(
@sse_connected = Concurrent::AtomicBoolean.new(false)
@config = config
@telemetry_runtime_producer = telemetry_runtime_producer
@sdk_blocker = sdk_blocker
@telemetry_synchronizer = telemetry_synchronizer
end

def start
Expand All @@ -40,6 +44,8 @@ def start
elsif @config.standalone?
start_poll
end

synchronize_telemetry_config
end

private
Expand All @@ -57,7 +63,7 @@ def start_poll
@config.logger.debug('Starting polling mode ...')
@synchronizer.start_periodic_fetch
@synchronizer.start_periodic_data_recording
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
rescue StandardError => e
@config.logger.error("start_poll error : #{e.inspect}")
end
Expand Down Expand Up @@ -103,20 +109,20 @@ def process_subsystem_ready
@synchronizer.stop_periodic_fetch
@synchronizer.sync_all
@sse_handler.start_workers
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_STREAMING)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_STREAMING)
end

def process_subsystem_down
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
end

def process_push_shutdown
@push_manager.stop_sse
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
rescue StandardError => e
@config.logger.error("process_push_shutdown error: #{e.inspect}")
end
Expand All @@ -131,7 +137,7 @@ def process_connected
@synchronizer.stop_periodic_fetch
@synchronizer.sync_all
@sse_handler.start_workers
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_STREAMING)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_STREAMING)
rescue StandardError => e
@config.logger.error("process_connected error: #{e.inspect}")
end
Expand All @@ -145,7 +151,7 @@ def process_disconnect(reconnect)
@sse_connected.make_false
@sse_handler.stop_workers
@synchronizer.start_periodic_fetch
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)

if reconnect
@synchronizer.sync_all
Expand All @@ -154,6 +160,22 @@ def process_disconnect(reconnect)
rescue StandardError => e
@config.logger.error("process_disconnect error: #{e.inspect}")
end

def record_telemetry(type, data)
@telemetry_runtime_producer.record_streaming_event(type, data)
end

def synchronize_telemetry_config
@config.threads[:telemetry_config_sender] = Thread.new do
begin
@sdk_blocker.wait_unitil_internal_ready
@telemetry_synchronizer.synchronize_config
rescue SplitIoClient::SDKShutdownException
@telemetry_synchronizer.synchronize_config
@config.logger.info('Posting Telemetry config due to shutdown')
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/splitclient-rb/engine/synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def impressions_count_sender
end

def start_telemetry_sync_task
Telemetry::SyncTask.new(@config, @telemetry_synchronizer)
Telemetry::SyncTask.new(@config, @telemetry_synchronizer).call
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/splitclient-rb/split_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def start!
}

synchronizer = SplitIoClient::Engine::Synchronizer.new(repositories, @api_key, @config, @sdk_blocker, params)
SplitIoClient::Engine::SyncManager.new(repositories, @api_key, @config, synchronizer, @runtime_producer).start
SplitIoClient::Engine::SyncManager.new(repositories, @api_key, @config, synchronizer, @runtime_producer, @sdk_blocker, @telemetry_synchronizer).start
end
end

Expand Down Expand Up @@ -162,6 +162,6 @@ def build_telemetry_components
@runtime_producer = Telemetry::RuntimeProducer.new(@config)

@telemetry_consumers = { init: @init_consumer, evaluation: @evaluation_consumer, runtime: @runtime_consumer }
end
end
end
end
8 changes: 4 additions & 4 deletions lib/splitclient-rb/telemetry/sync_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ def initialize(config, telemetry_synchronizer)
end

def call
telemetry_thread
stats_thread

PhusionPassenger.on_event(:starting_worker_process) { |forked| telemetry_thread if forked } if defined?(PhusionPassenger)
PhusionPassenger.on_event(:starting_worker_process) { |forked| stats_thread if forked } if defined?(PhusionPassenger)
end

private

def telemetry_thread
@config.threads[:telemetry_sender] = Thread.new do
def stats_thread
@config.threads[:telemetry_stats_sender] = Thread.new do
begin
@config.logger.info('Starting Telemetry Sync Task')

Expand Down
7 changes: 5 additions & 2 deletions spec/cache/fetchers/segment_fetch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@
let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) }
let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) }
let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:segment_fetcher) { described_class.new(segments_repository, '', config, nil, telemetry_runtime_producer) }
let(:sdk_blocker) { SplitIoClient::Cache::Stores::SDKBlocker.new(splits_repository, segments_repository, config) }
let(:segment_fetcher) { described_class.new(segments_repository, '', config, sdk_blocker, telemetry_runtime_producer) }
let(:split_fetcher) do
SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, '', config, nil, telemetry_runtime_producer)
SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, '', config, sdk_blocker, telemetry_runtime_producer)
end

it 'fetch segments' do
split_fetcher.send(:fetch_splits)
segment_fetcher.send(:fetch_segments)
segment_fetcher.send(:fetch_segments)
segment_fetcher.send(:fetch_segments)

expect(segment_fetcher.segments_repository.used_segment_names).to eq(['employees'])
end
Expand Down
18 changes: 13 additions & 5 deletions spec/engine/sync_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@
end
let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, sdk_blocker, sync_params) }

let(:init_producer) { SplitIoClient::Telemetry::InitProducer.new(config) }
let(:init_consumer) { SplitIoClient::Telemetry::InitConsumer.new(config) }
let(:runtime_consumer) { SplitIoClient::Telemetry::RuntimeConsumer.new(config) }
let(:evaluation_consumer) { SplitIoClient::Telemetry::EvaluationConsumer.new(config) }
let(:telemetry_consumers) { { init: init_consumer, runtime: runtime_consumer, evaluation: evaluation_consumer } }
let(:telemetry_api) { SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) }
let(:telemetry_synchronizer) { SplitIoClient::Telemetry::Synchronizer.new(config, telemetry_consumers, init_producer, repositories, telemetry_api) }

before do
mock_split_changes_with_since(splits, '-1')
mock_split_changes_with_since(splits, '1506703262916')
Expand All @@ -63,13 +71,13 @@

config.streaming_service_url = server.base_uri

sync_manager = subject.new(repositories, api_key, config, synchronizer, telemetry_runtime_producer)
sync_manager = subject.new(repositories, api_key, config, synchronizer, telemetry_runtime_producer, sdk_blocker, telemetry_synchronizer)
sync_manager.start

sleep(2)
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.once
expect(config.threads.size).to eq(9)
expect(config.threads.size).to eq(11)
end
end

Expand All @@ -82,13 +90,13 @@
config.streaming_service_url = 'https://fake-sse.io'
config.connection_timeout = 1

sync_manager = subject.new(repositories, api_key, config, synchronizer, telemetry_runtime_producer)
sync_manager = subject.new(repositories, api_key, config, synchronizer, telemetry_runtime_producer, sdk_blocker, telemetry_synchronizer)
sync_manager.start

sleep(2)
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.at_least_times(1)
expect(config.threads.size).to eq(6)
expect(config.threads.size).to eq(8)
end
end

Expand All @@ -100,7 +108,7 @@

config.streaming_service_url = server.base_uri

sync_manager = subject.new(repositories, api_key, config, synchronizer, telemetry_runtime_producer)
sync_manager = subject.new(repositories, api_key, config, synchronizer, telemetry_runtime_producer, sdk_blocker, telemetry_synchronizer)
sync_manager.start

sleep(2)
Expand Down
2 changes: 1 addition & 1 deletion spec/engine/synchronizer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
it 'start_periodic_data_recording' do
synchronizer.start_periodic_data_recording

expect(config.threads.size).to eq(3)
expect(config.threads.size).to eq(4)
end

it 'start_periodic_fetch' do
Expand Down
6 changes: 6 additions & 0 deletions spec/engine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@

before do
@mode = cache_adapter.equal?(:memory) ? :standalone : :consumer

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/usage')
.to_return(status: 200, body: 'ok')

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/config')
.to_return(status: 200, body: 'ok')
end

before :each do
Expand Down
12 changes: 12 additions & 0 deletions spec/splitclient/split_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
stub_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')
.to_return(status: 200, body: [])

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/usage')
.to_return(status: 200, body: 'ok')

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/config')
.to_return(status: 200, body: 'ok')

factory = described_class.new('browser_key', options)
factory.client.destroy
factory.client.get_treatment('key', 'split')
Expand All @@ -150,6 +156,12 @@

before :each do
SplitIoClient.split_factory_registry = SplitIoClient::SplitFactoryRegistry.new

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/usage')
.to_return(status: 200, body: 'ok')

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/config')
.to_return(status: 200, body: 'ok')
end

it 'logs warnings stating number of factories' do
Expand Down
6 changes: 6 additions & 0 deletions spec/splitclient/split_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

stub_request(:get, 'https://sdk.split.io/api/segmentChanges/employees?since=-1')
.to_return(status: 200, body: segments)

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/usage')
.to_return(status: 200, body: 'ok')

stub_request(:post, 'https://telemetry.split.io/api/v1/metrics/config')
.to_return(status: 200, body: 'ok')
end

context '#split' do
Expand Down
6 changes: 2 additions & 4 deletions spec/telemetry/synchronizer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
let(:adapter) { config.telemetry_adapter }
let(:init_producer) { SplitIoClient::Telemetry::InitProducer.new(config) }
let(:synchronizer) { SplitIoClient::Telemetry::Synchronizer.new(config, nil, init_producer, nil, nil) }
let(:config_key) { 'SPLITIO.telemetry.config' }
let(:config_key) { 'SPLITIO.telemetry.config' }

it 'synchronize_config with data' do
adapter.redis.del(config_key)
Expand Down Expand Up @@ -42,9 +42,7 @@
let(:evaluation_producer) { SplitIoClient::Telemetry::EvaluationProducer.new(config) }
let(:init_producer) { SplitIoClient::Telemetry::InitProducer.new(config) }
let(:telemetry_api) { SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) }
let(:telemetry_consumers) do
{ init: init_consumer, runtime: runtime_consumer, evaluation: evaluation_consumer }
end
let(:telemetry_consumers) { { init: init_consumer, runtime: runtime_consumer, evaluation: evaluation_consumer } }
let(:body_usage) { "{\"lS\":{\"sp\":111111222,\"se\":111111222,\"im\":111111222,\"ic\":111111222,\"ev\":111111222,\"te\":111111222,\"to\":111111222},\"mL\":{\"t\":[1,3,2,1],\"ts\":[2,3],\"tc\":[],\"tcs\":[],\"tr\":[3]},\"mE\":{\"t\":2,\"ts\":1,\"tc\":1,\"tcs\":0,\"tr\":1},\"hE\":{\"sp\":[],\"se\":[{\"400\":1}],\"im\":[],\"ic\":[],\"ev\":[{\"500\":2},{\"501\":1}],\"te\":[],\"to\":[]},\"hL\":{\"sp\":[6],\"se\":[],\"im\":[],\"ic\":[],\"ev\":[2,1],\"te\":[],\"to\":[]},\"tR\":1,\"aR\":1,\"iQ\":3,\"iDe\":1,\"iDr\":2,\"spC\":3,\"seC\":3,\"skC\":7,\"sL\":444555,\"eQ\":4,\"eD\":1,\"sE\":[{\"e\":\"token_refresh\",\"d\":222222333,\"t\":222222333},{\"e\":\"sync_mode\",\"d\":0,\"t\":222222333},{\"e\":\"sync_mode\",\"d\":1,\"t\":222222333}],\"t\":[\"tag-1\",\"tag-2\"]}" }
let(:empty_body_usage) { "{\"lS\":{\"sp\":0,\"se\":0,\"im\":0,\"ic\":0,\"ev\":0,\"te\":0,\"to\":0},\"mL\":{\"t\":[],\"ts\":[],\"tc\":[],\"tcs\":[],\"tr\":[]},\"mE\":{\"t\":0,\"ts\":0,\"tc\":0,\"tcs\":0,\"tr\":0},\"hE\":{\"sp\":[],\"se\":[],\"im\":[],\"ic\":[],\"ev\":[],\"te\":[],\"to\":[]},\"hL\":{\"sp\":[],\"se\":[],\"im\":[],\"ic\":[],\"ev\":[],\"te\":[],\"to\":[]},\"tR\":0,\"aR\":0,\"iQ\":0,\"iDe\":0,\"iDr\":0,\"spC\":0,\"seC\":0,\"skC\":0,\"sL\":0,\"eQ\":0,\"eD\":0,\"sE\":[],\"t\":[]}" }
let(:body_custom_config) { "{\"oM\":\"standalone\",\"sE\":true,\"st\":\"memory\",\"rR\":{\"sp\":100,\"se\":110,\"im\":120,\"ev\":130,\"te\":140},\"iQ\":5000,\"eQ\":500,\"iM\":\"optimized\",\"uO\":{\"s\":true,\"e\":true,\"a\":true,\"st\":false,\"t\":false},\"iL\":false,\"hP\":false,\"aF\":1,\"rF\":1,\"tR\":100,\"bT\":2,\"nR\":1,\"t\":[],\"i\":null}" }
Expand Down