Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ext/murmurhash/MurmurHash3.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ private static long getLittleEndianLong(final byte[] data, final int index) {
(((long) data[index + 7] & 0xff) << 56);
}

public static long[] hash128x64(final byte[] data) {
return hash128x64(data, 0, data.length, 0);
public static long[] hash128x64(final String data, final long seed) {
final byte[] dataBytes = data.getBytes();
return hash128x64(dataBytes, 0, dataBytes.length, seed);
}

/**
Expand Down
Binary file modified lib/murmurhash/murmurhash.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def add_bulk(impressions)
# Synchronizer might not be running
@adapter.expire(key, EXPIRE_SECONDS) if impressions_json.size == impressions_list_size
rescue StandardError => e
@config.logger.error("Exception while add_bulk_v2: #{e}")
@config.logger.error("Exception while add_bulk: #{e}")
end

def get_impressions(number_of_impressions = 0)
Expand Down
32 changes: 17 additions & 15 deletions lib/splitclient-rb/cache/routers/impression_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@ def initialize(config)
end

def add_bulk(impressions)
return unless @listener
impressions.each do |impression|
enqueue(
split_name: impression[:i][:f],
matching_key: impression[:i][:k],
bucketing_key: impression[:i][:b],
time: impression[:i][:m],
treatment: {
label: impression[:i][:r],
treatment: impression[:i][:t],
change_number: impression[:i][:c]
},
previous_time: impression[:i][:pt],
attributes: impression[:attributes]
) unless impression.nil?
enqueue(impression)
end
end

private

def enqueue(impression)
@queue.push(impression) if @listener
imp = {
split_name: impression[:i][:f],
matching_key: impression[:i][:k],
bucketing_key: impression[:i][:b],
time: impression[:i][:m],
treatment: {
label: impression[:i][:r],
treatment: impression[:i][:t],
change_number: impression[:i][:c]
},
previous_time: impression[:i][:pt],
attributes: impression[:attributes]
}
@queue.push(imp) if @listener
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def router_thread
Expand Down
8 changes: 4 additions & 4 deletions lib/splitclient-rb/engine/common/impressions_counter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def pop_all
to_return
end

def truncate_time_frame(timestamp_ms)
timestamp_ms - (timestamp_ms % TIME_INTERVAL_MS)
def make_key(split_name, time_frame)
"#{split_name}::#{ImpressionCounter.truncate_time_frame(time_frame)}"
end

def make_key(split_name, time_frame)
"#{split_name}::#{truncate_time_frame(time_frame)}"
def self.truncate_time_frame(timestamp_ms)
timestamp_ms - (timestamp_ms % TIME_INTERVAL_MS)
end
end
end
Expand Down
30 changes: 19 additions & 11 deletions lib/splitclient-rb/engine/common/impressions_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ def initialize(config, impressions_repository, impression_counter)
@config = config
@impressions_repository = impressions_repository
@impression_counter = impression_counter
@impression_router = SplitIoClient::ImpressionRouter.new(@config)
@impression_observer = SplitIoClient::Observers::ImpressionObserver.new
end

Expand All @@ -18,16 +17,24 @@ def build_impression(matching_key, bucketing_key, split_name, treatment, params

impression_data[:pt] = @impression_observer.test_and_set(impression_data) unless redis?

return impression_optimized(split_name, impression_data, params[:attributes]) if optimized? && !redis?
@impression_counter.inc(split_name, impression_data[:m]) if optimized? && !redis?

impression(impression_data, params[:attributes])
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def track(impressions)
@impressions_repository.add_bulk(impressions)
@impression_router.add_bulk(impressions)
return if impressions.empty?

impression_router.add_bulk(impressions)

if optimized? && !redis?
optimized_impressions = impressions.select { |imp| should_queue_impression?(imp[:i]) }
@impressions_repository.add_bulk(optimized_impressions)
else
@impressions_repository.add_bulk(impressions)
end
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end
Expand Down Expand Up @@ -64,14 +71,9 @@ def optimized?
@config.impressions_mode == :optimized
end

def impression_optimized(split_name, impression_data, attributes)
@impression_counter.inc(split_name, impression_data[:m])

impression(impression_data, attributes) if should_queue_impression?(impression_data)
end

def should_queue_impression?(impression)
impression[:pt].nil? || (impression[:pt] < ((Time.now.to_f * 1000.0).to_i - Common::TIME_INTERVAL_MS))
impression[:pt].nil? ||
(ImpressionCounter.truncate_time_frame(impression[:pt]) != ImpressionCounter.truncate_time_frame(impression[:m]))
end

def impression(impression_data, attributes)
Expand All @@ -81,6 +83,12 @@ def impression(impression_data, attributes)
def redis?
@config.impressions_adapter.class.to_s == 'SplitIoClient::Cache::Adapters::RedisAdapter'
end

def impression_router
@impression_router ||= SplitIoClient::ImpressionRouter.new(@config)
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/splitclient-rb/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module SplitIoClient
VERSION = '7.1.3'
VERSION = '7.1.4.pre.rc17'
end
11 changes: 6 additions & 5 deletions spec/engine/common/impression_counter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

describe SplitIoClient::Engine::Common::ImpressionCounter do
subject { SplitIoClient::Engine::Common::ImpressionCounter }
let(:imp_counter) { SplitIoClient::Engine::Common::ImpressionCounter }

before do
@counter = subject.new
end

it 'truncate time frame' do
expect(@counter.truncate_time_frame(make_timestamp('2020-09-02 10:53:12'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(@counter.truncate_time_frame(make_timestamp('2020-09-02 10:00:00'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(@counter.truncate_time_frame(make_timestamp('2020-09-02 10:53:00'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(@counter.truncate_time_frame(make_timestamp('2020-09-02 10:00:12'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(@counter.truncate_time_frame(make_timestamp('1970-01-01 00:00:00'))).to eq(make_timestamp('1970-01-01'))
expect(imp_counter.truncate_time_frame(make_timestamp('2020-09-02 10:53:12'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(imp_counter.truncate_time_frame(make_timestamp('2020-09-02 10:00:00'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(imp_counter.truncate_time_frame(make_timestamp('2020-09-02 10:53:00'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(imp_counter.truncate_time_frame(make_timestamp('2020-09-02 10:00:12'))).to eq(make_timestamp('2020-09-02 10:00:00'))
expect(imp_counter.truncate_time_frame(make_timestamp('1970-01-01 00:00:00'))).to eq(make_timestamp('1970-01-01'))
end

it 'make key' do
Expand Down
27 changes: 27 additions & 0 deletions spec/engine/common/impression_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
impression_manager = subject.new(config, impression_repository, impression_counter)
treatment = { treatment: 'off', label: 'default label', change_number: 1_478_113_516_002 }
params = { attributes: {}, time: 1_478_113_516_222 }

result = impression_manager.build_impression('matching_key_test', 'bucketing_key_test', 'split_name_test', treatment, params)

expect(result).to match(expected)
Expand All @@ -51,4 +52,30 @@
expect(impression_repository.batch.size).to eq(1)
expect(impression_listener.size).to eq(1)
end

it 'track optimized' do
impressions = []
impression_manager = subject.new(config, impression_repository, impression_counter)

treatment_data = { treatment: 'off', label: 'default label', change_number: 1_478_113_516_002 }
params = { attributes: {}, time: expected[:i][:m] }
imp = expected[:i]

impressions << impression_manager.build_impression(imp[:k], imp[:b], imp[:f], treatment_data, params)
impressions << impression_manager.build_impression(imp[:k], imp[:b], imp[:f], treatment_data, params)
impressions << impression_manager.build_impression(imp[:k], imp[:b], imp[:f], treatment_data, params)
impressions << impression_manager.build_impression(imp[:k], imp[:b], imp[:f], treatment_data, params)

impressions << impression_manager.build_impression('second_key', imp[:b], imp[:f], treatment_data, params)
impressions << impression_manager.build_impression('second_key', imp[:b], imp[:f], treatment_data, params)

impressions << impression_manager.build_impression('second_key', imp[:b], 'test_split', treatment_data, params)
impressions << impression_manager.build_impression('second_key', imp[:b], 'test_split', treatment_data, params)

impression_manager.track(impressions)

sleep(0.5)
expect(impression_repository.batch.size).to eq(3)
expect(impression_listener.size).to eq(8)
end
end
2 changes: 1 addition & 1 deletion spec/integrations/in_memory_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@
stub_request(:post, 'https://events.split.io/api/testImpressions/count')
.to_return(status: 200, body: 'ok')

@counter = SplitIoClient::Engine::Common::ImpressionCounter.new
@counter = SplitIoClient::Engine::Common::ImpressionCounter
end

it 'get_treament should post 3 impressions' do
Expand Down