Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 55 additions & 40 deletions lib/splitclient-rb/engine/common/impressions_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,75 @@ module SplitIoClient
module Engine
module Common
class ImpressionManager
def initialize(config, impressions_repository, impression_counter, telemetry_runtime_producer)
def initialize(config,
impressions_repository,
impression_counter,
telemetry_runtime_producer,
impression_observer,
unique_keys_tracker,
impression_router)
@config = config
@impressions_repository = impressions_repository
@impression_counter = impression_counter
@impression_observer = SplitIoClient::Observers::ImpressionObserver.new
@impression_observer = impression_observer
@telemetry_runtime_producer = telemetry_runtime_producer
@unique_keys_tracker = unique_keys_tracker
@impression_router = impression_router
end

# added param time for test
def build_impression(matching_key, bucketing_key, split_name, treatment, params = {})
impression_data = impression_data(matching_key, bucketing_key, split_name, treatment, params[:time])

impression_data[:pt] = @impression_observer.test_and_set(impression_data) unless redis?

@impression_counter.inc(split_name, impression_data[:m]) if optimized? && !redis?
begin
case @config.impressions_mode
when :debug
impression_data[:pt] = @impression_observer.test_and_set(impression_data) unless redis?
when :none
@impression_counter.inc(split_name, impression_data[:m])
@unique_keys_tracker.track(split_name, matching_key)
else
impression_data[:pt] = @impression_observer.test_and_set(impression_data)
@impression_counter.inc(split_name, impression_data[:m])
end
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

impression(impression_data, params[:attributes])
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

def track(impressions)
return if impressions.empty?

impression_router.add_bulk(impressions)

dropped = 0
queued = 0
dedupe = 0

if optimized? && !redis?
optimized_impressions = impressions.select { |imp| should_queue_impression?(imp[:i]) }

unless optimized_impressions.empty?
dropped = @impressions_repository.add_bulk(optimized_impressions)
dedupe = impressions.length - optimized_impressions.length
queued = optimized_impressions.length - dropped
stats = { dropped: 0, queued: 0, dedupe: 0 }
begin
case @config.impressions_mode
when :none
return
when :debug
track_debug_mode(impressions, stats)
when :optimized
track_optimized_mode(impressions, stats)
end
else
dropped = @impressions_repository.add_bulk(impressions)
queued = impressions.length - dropped
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
ensure
record_stats(stats)
@impression_router.add_bulk(impressions)
end

record_stats(queued, dropped, dedupe)
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

private

def record_stats(queued, dropped, dedupe)
def record_stats(stats)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return if redis?

imp_queued = Telemetry::Domain::Constants::IMPRESSIONS_QUEUED
imp_dropped = Telemetry::Domain::Constants::IMPRESSIONS_DROPPED
imp_dedupe = Telemetry::Domain::Constants::IMPRESSIONS_DEDUPE
@telemetry_runtime_producer.record_impressions_stats(imp_queued, queued) unless queued.zero?
@telemetry_runtime_producer.record_impressions_stats(imp_dropped, dropped) unless dropped.zero?
@telemetry_runtime_producer.record_impressions_stats(imp_dedupe, dedupe) unless dedupe.zero?
@telemetry_runtime_producer.record_impressions_stats(imp_queued, stats[:queued]) unless stats[:queued].zero?
@telemetry_runtime_producer.record_impressions_stats(imp_dropped, stats[:dropped]) unless stats[:dropped].zero?
@telemetry_runtime_producer.record_impressions_stats(imp_dedupe, stats[:dedupe]) unless stats[:dedupe].zero?
end

# added param time for test
Expand Down Expand Up @@ -91,10 +101,6 @@ def applied_rule(label)
@config.labels_enabled ? label : nil
end

def optimized?
@config.impressions_mode == :optimized
end

def should_queue_impression?(impression)
impression[:pt].nil? ||
(ImpressionCounter.truncate_time_frame(impression[:pt]) != ImpressionCounter.truncate_time_frame(impression[:m]))
Expand All @@ -108,10 +114,19 @@ def redis?
@config.impressions_adapter.class.to_s == 'SplitIoClient::Cache::Adapters::RedisAdapter'
end

def impression_router
@impression_router ||= SplitIoClient::ImpressionRouter.new(@config)
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
def track_debug_mode(impressions, stats)
stats[:dropped] = @impressions_repository.add_bulk(impressions)
stats[:queued] = impressions.length - stats[:dropped]
end

def track_optimized_mode(impressions, stats)
optimized_impressions = impressions.select { |imp| should_queue_impression?(imp[:i]) }

return if optimized_impressions.empty?

stats[:dropped] = @impressions_repository.add_bulk(optimized_impressions)
stats[:dedupe] = impressions.length - optimized_impressions.length
stats[:queued] = optimized_impressions.length - stats[:dropped]
end
end
end
Expand Down
22 changes: 19 additions & 3 deletions lib/splitclient-rb/split_factory.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# frozen_string_literal: true

require 'bloomfilter-rb'

module SplitIoClient
class SplitFactory
ROOT_PROCESS_ID = Process.pid
Expand Down Expand Up @@ -34,6 +38,7 @@ def initialize(api_key, config_hash = {})

build_telemetry_components
build_repositories
build_unique_keys_tracker
build_impressions_components
build_telemetry_synchronizer

Expand Down Expand Up @@ -198,13 +203,24 @@ def build_repositories
end

def build_telemetry_synchronizer
telemetry_api = Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, telemetry_api)
@telemetry_api = Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, @telemetry_api)
end

def build_unique_keys_tracker
bf = BloomFilter::Native.new(size: 95_850_584, hashes: 2)
filter_adapter = Cache::Filter::FilterAdapter.new(@config, bf)
sender_adapter = Cache::Senders::UniqueKeysSenderAdapter.new(config, @telemetry_api)
cache = Concurrent::Hash.new
@unique_keys_tracker = Engine::Impressions::UniqueKeysTracker.new(@config, filter_adapter, sender_adapter, cache)
end

def build_impressions_components
@impression_counter = Engine::Common::ImpressionCounter.new
@impressions_manager = Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
impression_observer = Observers::ImpressionObserver.new
impression_router = ImpressionRouter.new(@config)

@impressions_manager = Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer, impression_observer, @unique_keys_tracker, impression_router)
end
end
end
23 changes: 22 additions & 1 deletion spec/allocations/splitclient-rb/clients/split_client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'spec_helper'
require 'bloomfilter-rb'

describe SplitIoClient::SplitClient do
let(:config) { SplitIoClient::SplitConfig.new(impressions_queue_size: 10) }
Expand All @@ -10,8 +11,28 @@
let(:impressions_repository) { SplitIoClient::Cache::Repositories::ImpressionsRepository.new(config) }
let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new }
let(:evaluation_producer) { SplitIoClient::Telemetry::EvaluationProducer.new(config) }
let(:impression_observer) { SplitIoClient::Observers::ImpressionObserver.new }
let(:bf) { BloomFilter::Native.new(size: 100, hashes: 2, seed: 1, bucket: 3, raise: false) }
let(:filter_adapter) { SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf) }
let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:api_key) { 'SplitClient-key' }
let(:telemetry_api) { SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer) }
let(:sender_adapter) { SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api) }
let(:unique_keys_tracker) do
SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config,
filter_adapter,
sender_adapter,
Concurrent::Hash.new)
end
let(:impression_router) { SplitIoClient::ImpressionRouter.new(config) }
let(:impressions_manager) do
SplitIoClient::Engine::Common::ImpressionManager.new(config, impressions_repository, impression_counter, evaluation_producer)
SplitIoClient::Engine::Common::ImpressionManager.new(config,
impressions_repository,
impression_counter,
evaluation_producer,
impression_observer,
unique_keys_tracker,
impression_router)
end
let(:client) do
repositories = { splits: splits_repository, segments: segments_repository, impressions: impressions_repository, events: nil }
Expand Down
49 changes: 46 additions & 3 deletions spec/cache/repositories/impressions_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,28 @@
let(:repository) { described_class.new(config) }
let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new }
let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:impression_observer) { SplitIoClient::Observers::ImpressionObserver.new }
let(:unique_keys_tracker) do
bf = BloomFilter::Native.new(size: 100, hashes: 2, seed: 1, bucket: 3, raise: false)
filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf)
api_key = 'ImpressionsRepository-memory-key'
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer)
sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api)

SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config,
filter_adapter,
sender_adapter,
Concurrent::Hash.new)
end
let(:impression_router) { SplitIoClient::ImpressionRouter.new(config) }
let(:impressions_manager) do
SplitIoClient::Engine::Common::ImpressionManager.new(config, repository, impression_counter, runtime_producer)
SplitIoClient::Engine::Common::ImpressionManager.new(config,
repository,
impression_counter,
runtime_producer,
impression_observer,
unique_keys_tracker,
impression_router)
end

it_behaves_like 'Impressions Repository'
Expand Down Expand Up @@ -144,8 +164,28 @@
let(:repository) { described_class.new(config) }
let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new }
let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:impression_observer) { SplitIoClient::Observers::ImpressionObserver.new }
let(:unique_keys_tracker) do
bf = BloomFilter::Native.new(size: 100, hashes: 2, seed: 1, bucket: 3, raise: false)
filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf)
api_key = 'ImpressionsRepository-key'
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer)
sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api)

SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config,
filter_adapter,
sender_adapter,
Concurrent::Hash.new)
end
let(:impression_router) { SplitIoClient::ImpressionRouter.new(config) }
let(:impressions_manager) do
SplitIoClient::Engine::Common::ImpressionManager.new(config, repository, impression_counter, runtime_producer)
SplitIoClient::Engine::Common::ImpressionManager.new(config,
repository,
impression_counter,
runtime_producer,
impression_observer,
unique_keys_tracker,
impression_router)
end

it_behaves_like 'Impressions Repository'
Expand Down Expand Up @@ -207,7 +247,10 @@
custom_impressions_manager = SplitIoClient::Engine::Common::ImpressionManager.new(custom_config,
custom_repository,
impression_counter,
custom_runtime_producer)
custom_runtime_producer,
impression_observer,
unique_keys_tracker,
impression_router)
other_treatment = { treatment: 'on', label: 'sample_rule_2', change_number: 1_533_177_602_748 }

params = { attributes: {}, time: 1_478_113_516_002 }
Expand Down
22 changes: 21 additions & 1 deletion spec/cache/senders/impressions_formatter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,28 @@
let(:treatment3) { { treatment: 'off', label: nil, change_number: nil } }
let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new }
let(:runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:impression_observer) { SplitIoClient::Observers::ImpressionObserver.new }
let(:unique_keys_tracker) do
bf = BloomFilter::Native.new(size: 100, hashes: 2, seed: 1, bucket: 3, raise: false)
filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf)
api_key = 'ImpressionsFormatter-key'
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, runtime_producer)
sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api)

SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config,
filter_adapter,
sender_adapter,
Concurrent::Hash.new)
end
let(:impression_router) { SplitIoClient::ImpressionRouter.new(config) }
let(:impressions_manager) do
SplitIoClient::Engine::Common::ImpressionManager.new(config, repository, impression_counter, runtime_producer)
SplitIoClient::Engine::Common::ImpressionManager.new(config,
repository,
impression_counter,
runtime_producer,
impression_observer,
unique_keys_tracker,
impression_router)
end

before :each do
Expand Down
22 changes: 21 additions & 1 deletion spec/cache/senders/impressions_sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,28 @@
let(:treatment1) { { treatment: 'on', label: 'custom_label1', change_number: 123_456 } }
let(:treatment2) { { treatment: 'off', label: 'custom_label2', change_number: 123_499 } }
let(:impression_counter) { SplitIoClient::Engine::Common::ImpressionCounter.new }
let(:impression_observer) { SplitIoClient::Observers::ImpressionObserver.new }
let(:unique_keys_tracker) do
bf = BloomFilter::Native.new(size: 100, hashes: 2, seed: 1, bucket: 3, raise: false)
filter_adapter = SplitIoClient::Cache::Filter::FilterAdapter.new(config, bf)
api_key = 'ImpressionsSender-key'
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer)
sender_adapter = SplitIoClient::Cache::Senders::UniqueKeysSenderAdapter.new(config, telemetry_api)

SplitIoClient::Engine::Impressions::UniqueKeysTracker.new(config,
filter_adapter,
sender_adapter,
Concurrent::Hash.new)
end
let(:impression_router) { SplitIoClient::ImpressionRouter.new(config) }
let(:impressions_manager) do
SplitIoClient::Engine::Common::ImpressionManager.new(config, repository, impression_counter, telemetry_runtime_producer)
SplitIoClient::Engine::Common::ImpressionManager.new(config,
repository,
impression_counter,
telemetry_runtime_producer,
impression_observer,
unique_keys_tracker,
impression_router)
end

before :each do
Expand Down
Loading