From 53c9d2b726a6732f0376a29cd79469652415ae46 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 30 Mar 2022 16:29:36 -0500 Subject: [PATCH 1/3] Fixed and Added tests --- .../clients/split_client_spec.rb | 7 +- .../impressions_repository_spec.rb | 6 +- .../senders/impressions_count_sender_spec.rb | 126 +++++++++++------- .../senders/impressions_formatter_spec.rb | 3 +- spec/cache/senders/impressions_sender_spec.rb | 2 +- .../senders/localhost_repo_cleaner_spec.rb | 3 +- spec/engine/common/impression_manager_spec.rb | 3 +- .../impressions/unique_keys_tracker_spec.rb | 3 +- spec/engine/push_manager_spec.rb | 2 +- spec/engine/sync_manager_spec.rb | 2 +- spec/engine/synchronizer_spec.rb | 48 +++---- spec/sse/event_source/client_spec.rb | 2 +- spec/sse/sse_handler_spec.rb | 2 +- spec/sse/workers/segments_worker_spec.rb | 2 +- spec/sse/workers/splits_worker_spec.rb | 28 ++-- 15 files changed, 145 insertions(+), 94 deletions(-) diff --git a/spec/allocations/splitclient-rb/clients/split_client_spec.rb b/spec/allocations/splitclient-rb/clients/split_client_spec.rb index 87be1eab..861a08ba 100644 --- a/spec/allocations/splitclient-rb/clients/split_client_spec.rb +++ b/spec/allocations/splitclient-rb/clients/split_client_spec.rb @@ -17,7 +17,12 @@ let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:api_key) { 'SplitClient-key' } let(:telemetry_api) { SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) } - let(:sender_adapter) { SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) } + let(:impressions_api) { SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) } + let(:sender_adapter) do + SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, + telemetry_api, + impressions_api) + end let(:unique_keys_tracker) do SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, diff --git a/spec/cache/repositories/impressions_repository_spec.rb b/spec/cache/repositories/impressions_repository_spec.rb index 983064d9..35dbb292 100644 --- a/spec/cache/repositories/impressions_repository_spec.rb +++ b/spec/cache/repositories/impressions_repository_spec.rb @@ -112,7 +112,8 @@ filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) api_key = 'ImpressionsRepository-memory-key' telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) - sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) + sender_adapter = SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, @@ -170,7 +171,8 @@ filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) api_key = 'ImpressionsRepository-key' telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) - sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) + sender_adapter = SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, diff --git a/spec/cache/senders/impressions_count_sender_spec.rb b/spec/cache/senders/impressions_count_sender_spec.rb index ea2f320e..c684a489 100644 --- a/spec/cache/senders/impressions_count_sender_spec.rb +++ b/spec/cache/senders/impressions_count_sender_spec.rb @@ -5,14 +5,6 @@ describe SplitIoClient::Cache::Senders::ImpressionsCountSender do subject { SplitIoClient::Cache::Senders::ImpressionsCountSender } - let(:config) do - SplitIoClient::SplitConfig.new - end - let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new } - let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } - let(:impressions_api) { SplitIoClient::Api::Impressions.new('key-test', config, telemetry_runtime_producer) } - let(:impressions_count_sender) { described_class.new(config, impression_counter, impressions_api) } - before :each do impression_counter.inc('feature1', make_timestamp('2020-09-02 09:15:11')) impression_counter.inc('feature1', make_timestamp('2020-09-02 09:20:11')) @@ -22,49 +14,93 @@ impression_counter.inc('feature1', make_timestamp('2020-09-02 10:50:11')) end - it 'post impressions count with corresponding impressions count data' do - stub_request(:post, 'https://events.split.io/api/testImpressions/count').to_return(status: 200, body: 'ok') - - impressions_count_sender.call - - sleep 0.5 - - expect(a_request(:post, 'https://events.split.io/api/testImpressions/count') - .with( - body: - { - pf: - [ - { - f: 'feature1', - m: make_timestamp('2020-09-02 09:00:00'), - rc: 3 - }, - { - f: 'feature2', - m: make_timestamp('2020-09-02 09:00:00'), - rc: 2 - }, - { - f: 'feature1', - m: make_timestamp('2020-09-02 10:00:00'), - rc: 1 - } - ] - }.to_json - )).to have_been_made + context 'Redis Adapter' do + let(:config) do + SplitIoClient::SplitConfig.new(cache_adapter: :redis, redis_namespace: 'prefix-test') + end + let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new } + let(:impressions_sender_adapter) { SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, nil, nil) } + let(:impressions_count_sender) do + config.counter_refresh_rate = 0.5 + subject.new(config, impression_counter, impressions_sender_adapter) + end + + it 'posting impressions count' do + key = "#{config.redis_namespace}.impressions.count" + impressions_count_sender.call + + sleep 1 + + expect(config.cache_adapter.find_in_map(key, 'feature1::1599055200000').to_i).to eq(3) + expect(config.cache_adapter.find_in_map(key, 'feature2::1599055200000').to_i).to eq(2) + expect(config.cache_adapter.find_in_map(key, 'feature1::1599058800000').to_i).to eq(1) + + config.cache_adapter.delete(key) + end end - it 'calls #post_impressions upon destroy' do - expect(impressions_count_sender).to receive(:post_impressions_count).with(no_args) + context 'Memory Adapter' do + let(:config) do + SplitIoClient::SplitConfig.new + end + let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new } + let(:impressions_sender_adapter) do + telemetry_runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) + impressions_api = SplitIoClient::Api::Impressions.new('key-test', config, telemetry_runtime_producer) + telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, 'key-test', telemetry_runtime_producer) + + SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) + end + let(:impressions_count_sender) do + config.counter_refresh_rate = 0.5 + + subject.new(config, impression_counter, impressions_sender_adapter) + end + + it 'post impressions count with corresponding impressions count data' do + stub_request(:post, 'https://events.split.io/api/testImpressions/count').to_return(status: 200, body: 'ok') + + impressions_count_sender.call + + sleep 1 + + expect(a_request(:post, 'https://events.split.io/api/testImpressions/count') + .with( + body: + { + pf: + [ + { + f: 'feature1', + m: make_timestamp('2020-09-02 09:00:00'), + rc: 3 + }, + { + f: 'feature2', + m: make_timestamp('2020-09-02 09:00:00'), + rc: 2 + }, + { + f: 'feature1', + m: make_timestamp('2020-09-02 10:00:00'), + rc: 1 + } + ] + }.to_json + )).to have_been_made + end + + it 'calls #post_impressions upon destroy' do + expect(impressions_count_sender).to receive(:post_impressions_count).with(no_args) - impressions_count_sender.send(:impressions_count_thread) + impressions_count_sender.send(:impressions_count_thread) - sender_thread = config.threads[:impressions_count_sender] + sender_thread = config.threads[:impressions_count_sender] - sender_thread.raise(SplitIoClient::SDKShutdownException) + sender_thread.raise(SplitIoClient::SDKShutdownException) - sender_thread.join + sender_thread.join + end end def make_timestamp(time) diff --git a/spec/cache/senders/impressions_formatter_spec.rb b/spec/cache/senders/impressions_formatter_spec.rb index 77072cd9..9b6afc4c 100644 --- a/spec/cache/senders/impressions_formatter_spec.rb +++ b/spec/cache/senders/impressions_formatter_spec.rb @@ -20,7 +20,8 @@ filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) api_key = 'ImpressionsFormatter-key' telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) - sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) + sender_adapter = SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, diff --git a/spec/cache/senders/impressions_sender_spec.rb b/spec/cache/senders/impressions_sender_spec.rb index f486981b..a70d6cce 100644 --- a/spec/cache/senders/impressions_sender_spec.rb +++ b/spec/cache/senders/impressions_sender_spec.rb @@ -24,7 +24,7 @@ filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) api_key = 'ImpressionsSender-key' telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) - sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) + sender_adapter = SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impression_api) SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, diff --git a/spec/cache/senders/localhost_repo_cleaner_spec.rb b/spec/cache/senders/localhost_repo_cleaner_spec.rb index bc221ccf..c90e58dc 100644 --- a/spec/cache/senders/localhost_repo_cleaner_spec.rb +++ b/spec/cache/senders/localhost_repo_cleaner_spec.rb @@ -18,7 +18,8 @@ filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) api_key = 'LocalhostRepoCleaner-key' telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) - sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) + sender_adapter = SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, diff --git a/spec/engine/common/impression_manager_spec.rb b/spec/engine/common/impression_manager_spec.rb index 56394663..2be098ac 100644 --- a/spec/engine/common/impression_manager_spec.rb +++ b/spec/engine/common/impression_manager_spec.rb @@ -16,7 +16,8 @@ filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) api_key = 'ImpressionManager-key' telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) - sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) + sender_adapter = SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config, filter_adapter, diff --git a/spec/engine/impressions/unique_keys_tracker_spec.rb b/spec/engine/impressions/unique_keys_tracker_spec.rb index 1d490ce6..618add6e 100644 --- a/spec/engine/impressions/unique_keys_tracker_spec.rb +++ b/spec/engine/impressions/unique_keys_tracker_spec.rb @@ -16,7 +16,8 @@ let(:api_key) { 'UniqueKeysTracker-key' } let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:telemetry_api) { SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) } - let(:sender_adapter) { SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) } + let(:impressions_api) { SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) } + let(:sender_adapter) { SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) } it 'track - full cache and send bulk' do post_url = 'https://telemetry.split.io/api/v1/mtks/ss' diff --git a/spec/engine/push_manager_spec.rb b/spec/engine/push_manager_spec.rb index 8649f5f7..4904408c 100644 --- a/spec/engine/push_manager_spec.rb +++ b/spec/engine/push_manager_spec.rb @@ -29,7 +29,7 @@ telemetry_runtime_producer: runtime_producer } end - let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, params) } + let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, config, params) } let(:event_parser) { SplitIoClient::SSE::EventSource::EventParser.new(config) } let(:notification_processor) { SplitIoClient::SSE::NotificationProcessor.new(config, splits_worker, segments_worker) } let(:sse_client) { SplitIoClient::SSE::EventSource::Client.new(config, api_key, runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) } diff --git a/spec/engine/sync_manager_spec.rb b/spec/engine/sync_manager_spec.rb index 0b76779e..fea7baec 100644 --- a/spec/engine/sync_manager_spec.rb +++ b/spec/engine/sync_manager_spec.rb @@ -38,7 +38,7 @@ telemetry_runtime_producer: telemetry_runtime_producer } end - let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, sync_params) } + let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, config, sync_params) } let(:init_producer) { SplitIoClient::Telemetry::InitProducer.new(config) } let(:init_consumer) { SplitIoClient::Telemetry::InitConsumer.new(config) } let(:runtime_consumer) { SplitIoClient::Telemetry::RuntimeConsumer.new(config) } diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index 0c283909..4dc3d2d6 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -9,37 +9,29 @@ let(:segment1) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment1.json')) } let(:segment2) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment2.json')) } let(:segment3) { File.read(File.join(SplitIoClient.root, 'spec/test_data/integrations/segment3.json')) } - let(:api_key) { 'Synchronizer-key' } let(:log) { StringIO.new } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } - let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:impressions_repository) { SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) } - let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } - let(:events_repository) { SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, runtime_producer) } - let(:split_fetcher) do - SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, runtime_producer) + let(:synchronizer) do + api_key = 'Synchronizer-key' + runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) + splits_repository = SplitIoClient::Cache::Repositories::SplitsRepository.new(config) + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + + repositories = { + splits: splits_repository, + segments: segments_repository, + impressions: SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config), + events: SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, runtime_producer) + } + + parameters = { + split_fetcher: SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, runtime_producer), + segment_fetcher: SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, runtime_producer), + telemetry_runtime_producer: runtime_producer + } + + subject.new(repositories, config, parameters) end - let(:segment_fetcher) do - SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, runtime_producer) - end - let(:repositories) do - repos = {} - repos[:splits] = splits_repository - repos[:segments] = segments_repository - repos[:impressions] = impressions_repository - repos[:events] = events_repository - repos - end - let(:parameters) do - params = {} - params[:split_fetcher] = split_fetcher - params[:segment_fetcher] = segment_fetcher - params[:telemetry_runtime_producer] = runtime_producer - - params - end - let(:synchronizer) { subject.new(repositories, api_key, config, parameters) } context 'tests with mock data' do before do diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 9c331f22..ae65f5a4 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -29,7 +29,7 @@ telemetry_runtime_producer: telemetry_runtime_producer } end - let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, parameters) } + let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, config, parameters) } let(:splits_worker) { SplitIoClient::SSE::Workers::SplitsWorker.new(synchronizer, config, repositories[:splits]) } let(:segments_worker) { SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, config, repositories[:segments]) } let(:push_status_queue) { Queue.new } diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 74d593f8..231fb0be 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -30,7 +30,7 @@ telemetry_runtime_producer: telemetry_runtime_producer } end - let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, parameters) } + let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, config, parameters) } let(:splits_worker) { SplitIoClient::SSE::Workers::SplitsWorker.new(synchronizer, config, splits_repository) } let(:segments_worker) { SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, config, segments_repository) } let(:notification_processor) { SplitIoClient::SSE::NotificationProcessor.new(config, splits_worker, segments_worker) } diff --git a/spec/sse/workers/segments_worker_spec.rb b/spec/sse/workers/segments_worker_spec.rb index 308b7a1e..642ebd1a 100644 --- a/spec/sse/workers/segments_worker_spec.rb +++ b/spec/sse/workers/segments_worker_spec.rb @@ -23,7 +23,7 @@ let(:repositories) { { splits: splits_repository, segments: segments_repository } } let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new } let(:params) { { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: impression_counter, telemetry_runtime_producer: telemetry_runtime_producer } } - let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, params) } + let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, config, params) } before do mock_split_changes(splits) diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 81fce8fe..4c4e6530 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -14,16 +14,28 @@ let(:log) { StringIO.new } let(:config) { SplitIoClient::SplitConfig.new(logger: Logger.new(log)) } let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) } - let(:segments_repository) { SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) } - let(:impressions_repository) { SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } - let(:events_repository) { SplitIoClient::Cache::Repositories::EventsRepository.new(config, api_key, telemetry_runtime_producer) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer) } - let(:segment_fetcher) { SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer) } - let(:repositories) { { splits: splits_repository, segments: segments_repository } } - let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new } - let(:params) { { split_fetcher: split_fetcher, segment_fetcher: segment_fetcher, imp_counter: impression_counter, telemetry_runtime_producer: telemetry_runtime_producer } } - let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, params) } + let(:synchronizer) do + segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) + telemetry_api = SplitIoClient::Api::TelemetryApi.new(@config, @api_key, telemetry_runtime_producer) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) + + repositories = { + splits: splits_repository, + segments: segments_repository + } + + params = { + split_fetcher: SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer), + segment_fetcher: SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer), + imp_counter: SplitIoClient::Engine::Common::ImpressionCounter.new, + impressions_sender_adapter: SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api), + impressions_api: SplitIoClient::Api::Impressions.new(api_key, config, telemetry_runtime_producer) + } + + SplitIoClient::Engine::Synchronizer.new(repositories, config, params) + end context 'add change number to queue' do it 'add change number - must tigger fetch - with retries' do From 1416bd7eabc3499417841180a2c47ac4e68601a1 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Mon, 4 Apr 2022 11:14:56 -0500 Subject: [PATCH 2/3] Implemented component to write the bulk of mtks in redis --- lib/splitclient-rb.rb | 1 + .../observers/noop_impression_observer.rb | 10 +++ .../impressions_adapter/redis_sender.rb | 27 +++++++- .../cache/senders/impressions_count_sender.rb | 1 - .../engine/common/noop_impressions_counter.rb | 27 ++++++++ .../impressions/noop_unique_keys_tracker.rb | 17 +++++ lib/splitclient-rb/engine/synchronizer.rb | 6 ++ lib/splitclient-rb/split_config.rb | 9 +-- lib/splitclient-rb/split_factory.rb | 42 ++++++++++--- ....rb => memory_unique_keys_tracker_spec.rb} | 0 .../redis_unique_keys_tracker_spec.rb | 63 +++++++++++++++++++ spec/engine/sync_manager_spec.rb | 3 +- spec/engine/synchronizer_spec.rb | 3 +- spec/integrations/dedupe_impression_spec.rb | 3 +- 14 files changed, 193 insertions(+), 19 deletions(-) create mode 100644 lib/splitclient-rb/cache/observers/noop_impression_observer.rb create mode 100644 lib/splitclient-rb/engine/common/noop_impressions_counter.rb create mode 100644 lib/splitclient-rb/engine/impressions/noop_unique_keys_tracker.rb rename spec/engine/impressions/{unique_keys_tracker_spec.rb => memory_unique_keys_tracker_spec.rb} (100%) create mode 100644 spec/engine/impressions/redis_unique_keys_tracker_spec.rb diff --git a/lib/splitclient-rb.rb b/lib/splitclient-rb.rb index 87ee73d3..478cb88e 100644 --- a/lib/splitclient-rb.rb +++ b/lib/splitclient-rb.rb @@ -83,6 +83,7 @@ require 'splitclient-rb/engine/matchers/equal_to_matcher' require 'splitclient-rb/engine/matchers/matches_string_matcher' require 'splitclient-rb/engine/evaluator/splitter' +require 'splitclient-rb/engine/impressions/noop_unique_keys_tracker' require 'splitclient-rb/engine/impressions/unique_keys_tracker' require 'splitclient-rb/engine/metrics/binary_search_latency_tracker' require 'splitclient-rb/engine/models/split' diff --git a/lib/splitclient-rb/cache/observers/noop_impression_observer.rb b/lib/splitclient-rb/cache/observers/noop_impression_observer.rb new file mode 100644 index 00000000..d9b7d104 --- /dev/null +++ b/lib/splitclient-rb/cache/observers/noop_impression_observer.rb @@ -0,0 +1,10 @@ +module SplitIoClient + module Observers + class NoopImpressionObserver + def test_and_set(impression) + # no-op + end + end + end +end + \ No newline at end of file diff --git a/lib/splitclient-rb/cache/senders/impressions_adapter/redis_sender.rb b/lib/splitclient-rb/cache/senders/impressions_adapter/redis_sender.rb index d7598146..c79e6026 100644 --- a/lib/splitclient-rb/cache/senders/impressions_adapter/redis_sender.rb +++ b/lib/splitclient-rb/cache/senders/impressions_adapter/redis_sender.rb @@ -10,7 +10,11 @@ def initialize(config) end def record_uniques_key(uniques) - # TODO: implementation + formatted = uniques_formatter(uniques) + + @adapter.add_to_queue(unique_keys_key, formatted) unless formatted.nil? + rescue StandardError => e + @config.log_found_exception(__method__.to_s, e) end def record_impressions_count(impressions_count) @@ -28,6 +32,27 @@ def record_impressions_count(impressions_count) def impressions_count_key "#{@config.redis_namespace}.impressions.count" end + + def unique_keys_key + "#{@config.redis_namespace}.uniquekeys" + end + + def uniques_formatter(uniques) + return if uniques.empty? + + to_return = [] + uniques.each do |key, value| + to_return << { + f: key, + k: value.to_a + } + end + + to_return + rescue StandardError => error + @config.log_found_exception(__method__.to_s, error) + nil + end end end end diff --git a/lib/splitclient-rb/cache/senders/impressions_count_sender.rb b/lib/splitclient-rb/cache/senders/impressions_count_sender.rb index c1464516..a6e898ea 100644 --- a/lib/splitclient-rb/cache/senders/impressions_count_sender.rb +++ b/lib/splitclient-rb/cache/senders/impressions_count_sender.rb @@ -20,7 +20,6 @@ def impressions_count_thread @config.threads[:impressions_count_sender] = Thread.new do begin @config.logger.info('Starting impressions count service') - loop do sleep(@config.counter_refresh_rate) diff --git a/lib/splitclient-rb/engine/common/noop_impressions_counter.rb b/lib/splitclient-rb/engine/common/noop_impressions_counter.rb new file mode 100644 index 00000000..c560d4b4 --- /dev/null +++ b/lib/splitclient-rb/engine/common/noop_impressions_counter.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +require 'concurrent' + +module SplitIoClient + module Engine + module Common + class NoopmpressionCounter + def inc(split_name, time_frame) + # no-op + end + + def pop_all + # no-op + end + + def make_key(split_name, time_frame) + # no-op + end + + def self.truncate_time_frame(timestamp_ms) + # no-op + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/impressions/noop_unique_keys_tracker.rb b/lib/splitclient-rb/engine/impressions/noop_unique_keys_tracker.rb new file mode 100644 index 00000000..4b679d01 --- /dev/null +++ b/lib/splitclient-rb/engine/impressions/noop_unique_keys_tracker.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module SplitIoClient + module Engine + module Impressions + class NoopUniqueKeysTracker + def call + # no-op + end + + def track(feature_name, key) + # no-op + end + end + end + end +end diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index 6cd09893..34a1aecf 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -26,6 +26,7 @@ def initialize( @impression_counter = params[:imp_counter] @telemetry_synchronizer = params[:telemetry_synchronizer] @impressions_sender_adapter = params[:impressions_sender_adapter] + @unique_keys_tracker = params[:unique_keys_tracker] end def sync_all(asynchronous = true) @@ -45,6 +46,7 @@ def start_periodic_data_recording events_sender impressions_count_sender start_telemetry_sync_task + start_unique_keys_tracker_task end def start_periodic_fetch @@ -186,6 +188,10 @@ def start_telemetry_sync_task Telemetry::SyncTask.new(@config, @telemetry_synchronizer).call end + def start_unique_keys_tracker_task + @unique_keys_tracker.call + end + def sync_result(success, remaining_attempts, segment_names = nil) { success: success, remaining_attempts: remaining_attempts, segment_names: segment_names } end diff --git a/lib/splitclient-rb/split_config.rb b/lib/splitclient-rb/split_config.rb index 860f265d..6c3573ca 100644 --- a/lib/splitclient-rb/split_config.rb +++ b/lib/splitclient-rb/split_config.rb @@ -297,12 +297,9 @@ def initialize(opts = {}) attr_accessor :counter_refresh_rate def self.default_counter_refresh_rate(adapter) - case adapter - when :redis - 300 # Send bulk impressions count - Refresh rate: 5 min. - else - 1800 # Send bulk impressions count - Refresh rate: 30 min. - end + return 300 if adapter == :redis # Send bulk impressions count - Refresh rate: 5 min. + + 1800 # Send bulk impressions count - Refresh rate: 30 min. end def self.default_on_demand_fetch_retry_delay_seconds diff --git a/lib/splitclient-rb/split_factory.rb b/lib/splitclient-rb/split_factory.rb index 5f2ef773..38460858 100644 --- a/lib/splitclient-rb/split_factory.rb +++ b/lib/splitclient-rb/split_factory.rb @@ -38,6 +38,7 @@ def initialize(api_key, config_hash = {}) build_telemetry_components build_repositories + build_impressions_sender_adapter build_unique_keys_tracker build_impressions_components build_telemetry_synchronizer @@ -174,7 +175,8 @@ def build_synchronizer imp_counter: @impression_counter, telemetry_synchronizer: @telemetry_synchronizer, impressions_sender_adapter: @impressions_sender_adapter, - impressions_api: @impressions_api + impressions_api: @impressions_api, + unique_keys_tracker: @unique_keys_tracker } @synchronizer = Engine::Synchronizer.new(repositories, @config, params) @@ -209,20 +211,46 @@ def build_telemetry_synchronizer end def build_unique_keys_tracker + if @config.impressions_mode != :none + @unique_keys_tracker = Engine::Impressions::NoopUniqueKeysTracker.new + return + end + bf = BloomFilter::Native.new(size: 95_850_584, hashes: 2) filter_adapter = Cache::Filter::FilterAdapter.new(@config, bf) - @impressions_api = Api::Impressions.new(@api_key, @config, @runtime_producer) - @impressions_sender_adapter = Cache::Senders::ImpressionsSenderAdapter.new(config, @telemetry_api, @impressions_api) cache = Concurrent::Hash.new @unique_keys_tracker = Engine::Impressions::UniqueKeysTracker.new(@config, filter_adapter, @impressions_sender_adapter, cache) end + def build_impressions_observer + if (@config.cache_adapter == :redis && @config.impressions_mode != :optimized) || + (@config.cache_adapter == :memory && @config.impressions_mode == :none) + @impression_observer = Observers::NoopImpressionObserver.new + else + @impression_observer = Observers::ImpressionObserver.new + end + end + + def build_impression_counter + case @config.impressions_mode + when :none + @impression_counter = Engine::Common::NoopmpressionCounter.new + else + @impression_counter = Engine::Common::ImpressionCounter.new + end + end + + def build_impressions_sender_adapter + @impressions_api = Api::Impressions.new(@api_key, @config, @runtime_producer) + @impressions_sender_adapter = Cache::Senders::ImpressionsSenderAdapter.new(@config, @telemetry_api, @impressions_api) + end + def build_impressions_components - @impression_counter = Engine::Common::ImpressionCounter.new - impression_observer = Observers::ImpressionObserver.new - impression_router = ImpressionRouter.new(@config) + build_impressions_observer + build_impression_counter - @impressions_manager = Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer, impression_observer, @unique_keys_tracker, impression_router) + impression_router = ImpressionRouter.new(@config) + @impressions_manager = Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer, @impression_observer, @unique_keys_tracker, impression_router) end end end diff --git a/spec/engine/impressions/unique_keys_tracker_spec.rb b/spec/engine/impressions/memory_unique_keys_tracker_spec.rb similarity index 100% rename from spec/engine/impressions/unique_keys_tracker_spec.rb rename to spec/engine/impressions/memory_unique_keys_tracker_spec.rb diff --git a/spec/engine/impressions/redis_unique_keys_tracker_spec.rb b/spec/engine/impressions/redis_unique_keys_tracker_spec.rb new file mode 100644 index 00000000..da307241 --- /dev/null +++ b/spec/engine/impressions/redis_unique_keys_tracker_spec.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'filter_imp_test' + +describe SplitIoClient::Engine::Impressions::UniqueKeysTracker do + subject { SplitIoClient::Engine::Impressions::UniqueKeysTracker } + + let(:config) do + SplitIoClient::SplitConfig.new(logger: Logger.new(StringIO.new), cache_adapter: :redis, redis_namespace: 'tracker-prefix') + end + let(:sender_adapter) do + api_key = 'UniqueKeysTracker-key' + runtime_producer = SplitIoClient::Telemetry::RuntimeProducer.new(config) + telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) + impressions_api = SplitIoClient::Api::Impressions.new(api_key, config, runtime_producer) + + SplitIoClient::Cache::Senders::ImpressionsSenderAdapter.new(config, telemetry_api, impressions_api) + end + let(:filter_adapter) do + bf = FilterTest.new + SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) + end + + it 'track - full cache and send bulk' do + key = "#{config.redis_namespace}.uniquekeys" + + cache = Concurrent::Hash.new + config.unique_keys_cache_max_size = 20 + config.unique_keys_bulk_size = 2 + tracker = subject.new(config, filter_adapter, sender_adapter, cache) + + 20.times do |i| + expect(tracker.track("feature-test-#{i}", 'key_test-1')).to eq(true) + expect(tracker.track("feature-test-#{i}", 'key_test-2')).to eq(true) + end + + expect(config.cache_adapter.get_from_queue(key, 0).size).to eq(20) + + cache.clear + end + + it 'track - task should send bulk.' do + key = "#{config.redis_namespace}.uniquekeys" + + cache = Concurrent::Hash.new + config.unique_keys_refresh_rate = 0.5 + tracker = subject.new(config, filter_adapter, sender_adapter, cache) + + tracker.call + + 10.times do |i| + expect(tracker.track("feature-test-#{i}", 'key_test-1')).to eq(true) + expect(tracker.track("feature-test-#{i}", 'key_test-2')).to eq(true) + end + + sleep 1 + + expect(config.cache_adapter.get_from_queue(key, 0).size).to eq(10) + + cache.clear + end +end diff --git a/spec/engine/sync_manager_spec.rb b/spec/engine/sync_manager_spec.rb index fea7baec..c17f8aa8 100644 --- a/spec/engine/sync_manager_spec.rb +++ b/spec/engine/sync_manager_spec.rb @@ -35,7 +35,8 @@ split_fetcher: SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer), segment_fetcher: SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, telemetry_runtime_producer), imp_counter: impression_counter, - telemetry_runtime_producer: telemetry_runtime_producer + telemetry_runtime_producer: telemetry_runtime_producer, + unique_keys_tracker: SplitIoClient::Engine::Impressions::NoopUniqueKeysTracker.new } end let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, config, sync_params) } diff --git a/spec/engine/synchronizer_spec.rb b/spec/engine/synchronizer_spec.rb index 4dc3d2d6..ea5327fa 100644 --- a/spec/engine/synchronizer_spec.rb +++ b/spec/engine/synchronizer_spec.rb @@ -27,7 +27,8 @@ parameters = { split_fetcher: SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, runtime_producer), segment_fetcher: SplitIoClient::Cache::Fetchers::SegmentFetcher.new(segments_repository, api_key, config, runtime_producer), - telemetry_runtime_producer: runtime_producer + telemetry_runtime_producer: runtime_producer, + unique_keys_tracker: SplitIoClient::Engine::Impressions::NoopUniqueKeysTracker.new } subject.new(repositories, config, parameters) diff --git a/spec/integrations/dedupe_impression_spec.rb b/spec/integrations/dedupe_impression_spec.rb index 33a2fce7..dfa61715 100644 --- a/spec/integrations/dedupe_impression_spec.rb +++ b/spec/integrations/dedupe_impression_spec.rb @@ -82,7 +82,6 @@ factory = SplitIoClient::SplitFactory.new('test_api_key-1', streaming_enabled: false, impressions_mode: :optimized, impressions_refresh_rate: 60) client = factory.client client.block_until_ready - sleep 1 expect(client.get_treatment('nico_test', 'FACUNDO_TEST')).to eq 'on' expect(client.get_treatment('nico_test', 'FACUNDO_TEST')).to eq 'on' @@ -92,8 +91,8 @@ time_frame = SplitIoClient::Engine::Common::ImpressionCounter.truncate_time_frame((Time.now.to_f * 1000.0).to_i) + sleep 1 client.destroy - sleep 0.5 expect(a_request(:post, 'https://events.split.io/api/testImpressions/count') .with( From a473a9297dc1dad04df16a631fb956937d70e28b Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Mon, 4 Apr 2022 14:49:31 -0500 Subject: [PATCH 3/3] polishing --- lib/splitclient-rb.rb | 2 ++ .../cache/senders/impressions_adapter/memory_sender.rb | 2 +- .../engine/common/noop_impressions_counter.rb | 2 +- .../engine/impressions/unique_keys_tracker.rb | 1 + lib/splitclient-rb/engine/synchronizer.rb | 6 +++--- lib/splitclient-rb/split_factory.rb | 6 +++--- 6 files changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/splitclient-rb.rb b/lib/splitclient-rb.rb index 478cb88e..1801b920 100644 --- a/lib/splitclient-rb.rb +++ b/lib/splitclient-rb.rb @@ -15,6 +15,7 @@ require 'splitclient-rb/cache/filter/filter_adapter' require 'splitclient-rb/cache/hashers/impression_hasher' require 'splitclient-rb/cache/observers/impression_observer' +require 'splitclient-rb/cache/observers/noop_impression_observer' require 'splitclient-rb/cache/repositories/repository' require 'splitclient-rb/cache/repositories/segments_repository' require 'splitclient-rb/cache/repositories/splits_repository' @@ -56,6 +57,7 @@ require 'splitclient-rb/engine/api/telemetry_api' require 'splitclient-rb/engine/common/impressions_counter' require 'splitclient-rb/engine/common/impressions_manager' +require 'splitclient-rb/engine/common/noop_impressions_counter' require 'splitclient-rb/engine/parser/condition' require 'splitclient-rb/engine/parser/partition' require 'splitclient-rb/engine/parser/evaluator' diff --git a/lib/splitclient-rb/cache/senders/impressions_adapter/memory_sender.rb b/lib/splitclient-rb/cache/senders/impressions_adapter/memory_sender.rb index 4b27d19b..83caee54 100644 --- a/lib/splitclient-rb/cache/senders/impressions_adapter/memory_sender.rb +++ b/lib/splitclient-rb/cache/senders/impressions_adapter/memory_sender.rb @@ -46,7 +46,7 @@ def uniques_formatter(uniques) end def impressions_count_formatter(counts) - return if counts.empty? + return if counts.nil? || counts.empty? formated_counts = {pf: []} diff --git a/lib/splitclient-rb/engine/common/noop_impressions_counter.rb b/lib/splitclient-rb/engine/common/noop_impressions_counter.rb index c560d4b4..6ce87e0c 100644 --- a/lib/splitclient-rb/engine/common/noop_impressions_counter.rb +++ b/lib/splitclient-rb/engine/common/noop_impressions_counter.rb @@ -5,7 +5,7 @@ module SplitIoClient module Engine module Common - class NoopmpressionCounter + class NoopImpressionCounter def inc(split_name, time_frame) # no-op end diff --git a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb index 4637348e..79fc5b5c 100644 --- a/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb +++ b/lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb @@ -49,6 +49,7 @@ def send_bulk_data_thread end rescue SplitIoClient::SDKShutdownException send_bulk_data + @config.logger.info('Posting unique keys due to shutdown') end def clear_filter_thread diff --git a/lib/splitclient-rb/engine/synchronizer.rb b/lib/splitclient-rb/engine/synchronizer.rb index 34a1aecf..bb4b6ef4 100644 --- a/lib/splitclient-rb/engine/synchronizer.rb +++ b/lib/splitclient-rb/engine/synchronizer.rb @@ -43,8 +43,8 @@ def sync_all(asynchronous = true) def start_periodic_data_recording impressions_sender - events_sender impressions_count_sender + events_sender start_telemetry_sync_task start_unique_keys_tracker_task end @@ -171,7 +171,7 @@ def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_secon # Starts thread which loops constantly and sends impressions to the Split API def impressions_sender - ImpressionsSender.new(@impressions_repository, @config, @impressions_api).call + ImpressionsSender.new(@impressions_repository, @config, @impressions_api).call unless @config.impressions_mode == :none end # Starts thread which loops constantly and sends events to the Split API @@ -181,7 +181,7 @@ def events_sender # Starts thread which loops constantly and sends impressions count to the Split API def impressions_count_sender - ImpressionsCountSender.new(@config, @impression_counter, @impressions_sender_adapter).call + ImpressionsCountSender.new(@config, @impression_counter, @impressions_sender_adapter).call unless @config.impressions_mode == :debug end def start_telemetry_sync_task diff --git a/lib/splitclient-rb/split_factory.rb b/lib/splitclient-rb/split_factory.rb index 38460858..03927786 100644 --- a/lib/splitclient-rb/split_factory.rb +++ b/lib/splitclient-rb/split_factory.rb @@ -38,10 +38,10 @@ def initialize(api_key, config_hash = {}) build_telemetry_components build_repositories + build_telemetry_synchronizer build_impressions_sender_adapter build_unique_keys_tracker build_impressions_components - build_telemetry_synchronizer @status_manager = Engine::StatusManager.new(@config) @@ -233,8 +233,8 @@ def build_impressions_observer def build_impression_counter case @config.impressions_mode - when :none - @impression_counter = Engine::Common::NoopmpressionCounter.new + when :debug + @impression_counter = Engine::Common::NoopImpressionCounter.new else @impression_counter = Engine::Common::ImpressionCounter.new end