Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
5bc5be3
wip
sanzmauro Sep 9, 2020
148b550
add_bulk_v2 implementation working
sanzmauro Sep 9, 2020
28e0fb6
fixed rubocop
sanzmauro Sep 10, 2020
ec09e80
removed TODOs and fix tests
sanzmauro Sep 10, 2020
5a9eac9
added impression manager tests
sanzmauro Sep 10, 2020
fedf835
fix rubocop
sanzmauro Sep 10, 2020
ca6b2b7
added impression observer
sanzmauro Sep 10, 2020
102ba45
added impression observer test
sanzmauro Sep 10, 2020
1547993
added previousTime in impression to send
sanzmauro Sep 10, 2020
bd221e6
added should_add_pt
sanzmauro Sep 10, 2020
2fd7986
fix rubocop
sanzmauro Sep 10, 2020
af207cb
added impression mode config
sanzmauro Sep 10, 2020
726ee66
updated error message
sanzmauro Sep 11, 2020
5240d0e
fixed robocop
sanzmauro Sep 11, 2020
6104e8b
added counter and tests
sanzmauro Sep 11, 2020
15806f3
fix rubocop
sanzmauro Sep 11, 2020
d1aa523
debug
sanzmauro Sep 11, 2020
85b66e4
debug
sanzmauro Sep 11, 2020
60d12fd
pr feedback
sanzmauro Sep 14, 2020
53753f0
Merge pull request #329 from splitio/add-impression-manager-v2
sanzmauro Sep 14, 2020
08362e3
Merge pull request #330 from splitio/add-impression-manager-v2-cleani…
sanzmauro Sep 14, 2020
918018f
Merge pull request #331 from splitio/add-impression-observer-v2
sanzmauro Sep 14, 2020
e141735
added tests
sanzmauro Sep 14, 2020
98de4bf
deubg
sanzmauro Sep 14, 2020
bf8bc28
Merge pull request #332 from splitio/add-impression-modes
sanzmauro Sep 14, 2020
751cfff
Merge pull request #333 from splitio/add-impression-counter
sanzmauro Sep 14, 2020
6ec912f
removed matching_key
sanzmauro Sep 14, 2020
57dacc4
fix tests
sanzmauro Sep 14, 2020
5e15875
updated dto and tests
sanzmauro Sep 14, 2020
019999c
added header and removed ip logic
sanzmauro Sep 14, 2020
5ad56f4
Merge pull request #334 from splitio/update-dto-impressions
sanzmauro Sep 14, 2020
57245e3
added task sender and tests
sanzmauro Sep 15, 2020
eda4c5a
improvements
sanzmauro Sep 15, 2020
4283b25
fix tests
sanzmauro Sep 16, 2020
54e1846
pr feedback
sanzmauro Sep 16, 2020
349d9c0
polishing and added tests
sanzmauro Sep 16, 2020
0e8d5f4
fixed rubocop
sanzmauro Sep 16, 2020
c1a942f
renamed header name
sanzmauro Sep 16, 2020
8f75f2b
added e2e tests
sanzmauro Sep 16, 2020
7e8023c
added debug mode tests
sanzmauro Sep 16, 2020
4c86496
added block until ready
sanzmauro Sep 16, 2020
f754cd3
fixed rubocop
sanzmauro Sep 16, 2020
1bef23c
Merge pull request #335 from splitio/new-endpoint-and-task
sanzmauro Sep 17, 2020
adb69dd
wip
sanzmauro Sep 17, 2020
f3a4fa6
Merge pull request #336 from splitio/integration-tests
sanzmauro Sep 17, 2020
8ae6704
polishing
sanzmauro Sep 21, 2020
1969f10
fix unicorn and polishing
sanzmauro Sep 22, 2020
900a213
pr suggestions
sanzmauro Sep 23, 2020
86868e6
added test
sanzmauro Sep 23, 2020
6107e5d
Merge pull request #337 from splitio/imp-dedupe-polishing
sanzmauro Sep 24, 2020
da1454c
updated changes
sanzmauro Sep 25, 2020
385293d
updated version
sanzmauro Sep 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
CHANGES

7.2.0 (Sep 25, 2020)
- Added deduplication logic for impressions data.
- Now there are two modes for Impressions when the SDK is in standalone mode, OPTIMIZED (default) that only ships unique impressions and DEBUG for times where you need to send ALL impressions to debug an integration.
- Impression listener remains unchanged and will still get all impressions.

7.1.3 (Jul 31, 2020)
- Updated rake development dependency to ~> 12.3.3.

Expand Down
5 changes: 3 additions & 2 deletions ext/murmurhash/MurmurHash3.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ private static long getLittleEndianLong(final byte[] data, final int index) {
(((long) data[index + 7] & 0xff) << 56);
}

public static long[] hash128x64(final byte[] data) {
return hash128x64(data, 0, data.length, 0);
public static long[] hash128x64(final String data, final long seed) {
final byte[] dataBytes = data.getBytes();
return hash128x64(dataBytes, 0, dataBytes.length, seed);
}

/**
Expand Down
Binary file modified lib/murmurhash/murmurhash.jar
Binary file not shown.
4 changes: 4 additions & 0 deletions lib/splitclient-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require 'splitclient-rb/cache/fetchers/segment_fetcher'
require 'splitclient-rb/cache/fetchers/split_fetcher'
require 'splitclient-rb/cache/hashers/impression_hasher'
require 'splitclient-rb/cache/observers/impression_observer'
require 'splitclient-rb/cache/repositories/repository'
require 'splitclient-rb/cache/repositories/segments_repository'
require 'splitclient-rb/cache/repositories/splits_repository'
Expand All @@ -29,6 +30,7 @@
require 'splitclient-rb/cache/senders/impressions_sender'
require 'splitclient-rb/cache/senders/metrics_sender'
require 'splitclient-rb/cache/senders/events_sender'
require 'splitclient-rb/cache/senders/impressions_count_sender'
require 'splitclient-rb/cache/senders/localhost_repo_cleaner'
require 'splitclient-rb/cache/stores/store_utils'
require 'splitclient-rb/cache/stores/localhost_split_builder'
Expand All @@ -53,6 +55,8 @@
require 'splitclient-rb/engine/api/segments'
require 'splitclient-rb/engine/api/splits'
require 'splitclient-rb/engine/api/events'
require 'splitclient-rb/engine/common/impressions_counter'
require 'splitclient-rb/engine/common/impressions_manager'
require 'splitclient-rb/engine/parser/condition'
require 'splitclient-rb/engine/parser/partition'
require 'splitclient-rb/engine/parser/evaluator'
Expand Down
2 changes: 1 addition & 1 deletion lib/splitclient-rb/cache/hashers/impression_hasher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize
end
end

def process(impression)
def process(impression)
impression_data = "#{unknown_if_null(impression[:k])}"
impression_data << ":#{unknown_if_null(impression[:f])}"
impression_data << ":#{unknown_if_null(impression[:t])}"
Expand Down
22 changes: 22 additions & 0 deletions lib/splitclient-rb/cache/observers/impression_observer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module SplitIoClient
module Observers
class ImpressionObserver
LAST_SEEN_CACHE_SIZE = 500000

def initialize
@cache = LruRedux::TTL::ThreadSafeCache.new(LAST_SEEN_CACHE_SIZE)
@impression_hasher = Hashers::ImpressionHasher.new
end

def test_and_set(impression)
return if impression.nil?

hash = @impression_hasher.process(impression)
previous = @cache[hash]
@cache[hash] = impression[:m]

previous.nil? ? nil : [previous, impression[:m]].min
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ def initialize(config)
@adapter = @config.impressions_adapter
end

# Store impression data in the selected adapter
def add(matching_key, bucketing_key, split_name, treatment, time)
@adapter.add_to_queue(
m: metadata,
i: impression_data(
matching_key,
bucketing_key,
split_name,
treatment,
time
)
)
def add_bulk(impressions)
impressions.each do |impression|
@adapter.add_to_queue(impression)
end
rescue ThreadError # queue is full
if random_sampler.rand(1..1000) <= 2 # log only 0.2 % of the time
@config.logger.warn("Dropping impressions. Current size is \
Expand All @@ -30,12 +22,6 @@ def add(matching_key, bucketing_key, split_name, treatment, time)
end
end

def add_bulk(key, bucketing_key, treatments, time)
treatments.each do |split_name, treatment|
add(key, bucketing_key, split_name, treatment, time)
end
end

def batch
return [] if @config.impressions_bulk_size.zero?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,17 @@ def initialize(config)
@adapter = @config.impressions_adapter
end

def add(matching_key, bucketing_key, split_name, treatment, time)
add_bulk(matching_key, bucketing_key, { split_name => treatment }, time)
end

def add_bulk(matching_key, bucketing_key, treatments, time)
impressions = treatments.map do |split_name, treatment|
{
m: metadata,
i: impression_data(
matching_key,
bucketing_key,
split_name,
treatment,
time
)
}.to_json
def add_bulk(impressions)
impressions_json = impressions.map do |impression|
impression.to_json
end

impressions_list_size = @adapter.add_to_queue(key, impressions)
impressions_list_size = @adapter.add_to_queue(key, impressions_json)

# Synchronizer might not be running
@adapter.expire(key, EXPIRE_SECONDS) if impressions.size == impressions_list_size
@adapter.expire(key, EXPIRE_SECONDS) if impressions_json.size == impressions_list_size
rescue StandardError => e
@config.logger.error("Exception while add_bulk: #{e}")
end

def get_impressions(number_of_impressions = 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Repositories
# Repository which forwards impressions interface to the selected adapter
class ImpressionsRepository < Repository
extend Forwardable
def_delegators :@repository, :add, :add_bulk, :batch, :clear, :empty?
def_delegators :@repository, :add_bulk, :batch, :clear, :empty?

def initialize(config)
super(config)
Expand All @@ -17,32 +17,6 @@ def initialize(config)
Repositories::Impressions::RedisRepository.new(@config)
end
end

protected

def impression_data(matching_key, bucketing_key, split_name, treatment, timestamp)
{
k: matching_key,
b: bucketing_key,
f: split_name,
t: treatment[:treatment],
r: applied_rule(treatment[:label]),
c: treatment[:change_number],
m: timestamp
}
end

def metadata
{
s: "#{@config.language}-#{@config.version}",
i: @config.machine_ip,
n: @config.machine_name
}
end

def applied_rule(label)
@config.labels_enabled ? label : nil
end
end
end
end
Expand Down
36 changes: 18 additions & 18 deletions lib/splitclient-rb/cache/routers/impression_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,31 @@ def initialize(config)
end
end

def add(impression)
enqueue(impression)
end

def add_bulk(impressions)
impressions[:split_names].each do |split_name|
enqueue(
split_name: split_name.to_s,
matching_key: impressions[:matching_key],
bucketing_key: impressions[:bucketing_key],
time: impressions[:time],
treatment: {
label: impressions[:treatments_labels_change_numbers][split_name.to_sym][:label],
treatment: impressions[:treatments_labels_change_numbers][split_name.to_sym][:treatment],
change_number: impressions[:treatments_labels_change_numbers][split_name.to_sym][:change_number]
},
attributes: impressions[:attributes]
) unless impressions[:treatments_labels_change_numbers][split_name.to_sym].nil?
impressions.each do |impression|
enqueue(impression)
end
end

private

def enqueue(impression)
@queue.push(impression) if @listener
imp = {
split_name: impression[:i][:f],
matching_key: impression[:i][:k],
bucketing_key: impression[:i][:b],
time: impression[:i][:m],
treatment: {
label: impression[:i][:r],
treatment: impression[:i][:t],
change_number: impression[:i][:c]
},
previous_time: impression[:i][:pt],
attributes: impression[:attributes]
}
@queue.push(imp) if @listener
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def router_thread
Expand Down
73 changes: 73 additions & 0 deletions lib/splitclient-rb/cache/senders/impressions_count_sender.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

module SplitIoClient
module Cache
module Senders
class ImpressionsCountSender
COUNTER_REFRESH_RATE_SECONDS = 1800

def initialize(config, impression_counter, impressions_api)
@config = config
@impression_counter = impression_counter
@impressions_api = impressions_api
end

def call
impressions_count_thread

if defined?(PhusionPassenger)
PhusionPassenger.on_event(:starting_worker_process) do |forked|
impressions_count_thread if forked
end
end
end

private

def impressions_count_thread
@config.threads[:impressions_count_sender] = Thread.new do
begin
@config.logger.info('Starting impressions count service')

loop do
post_impressions_count

sleep(COUNTER_REFRESH_RATE_SECONDS)
end
rescue SplitIoClient::SDKShutdownException
post_impressions_count

@config.logger.info('Posting impressions count due to shutdown')
end
end

def post_impressions_count
@impressions_api.post_count(formatter(@impression_counter.pop_all))
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end

def formatter(counts)
return if counts.empty?

formated_counts = {pf: []}

counts.each do |key, value|
key_splited = key.split('::')

formated_counts[:pf] << {
f: key_splited[0].to_s, # feature name
m: key_splited[1].to_i, # time frame
rc: value # count
}
end

formated_counts
rescue StandardError => error
@config.log_found_exception(__method__.to_s, error)
end
end
end
end
end
end
22 changes: 11 additions & 11 deletions lib/splitclient-rb/cache/senders/impressions_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ def call(fetch_all_impressions, raw_impressions = nil)

formatted_impressions = unique_features(filtered_impressions).each_with_object([]) do |feature, memo|
feature_impressions = feature_impressions(filtered_impressions, feature)
ip = feature_impressions.first[:m][:i]
current_impressions = current_impressions(feature_impressions)
memo << {
testName: feature.to_sym,
keyImpressions: current_impressions,
ip: ip
f: feature.to_sym,
i: current_impressions
}
end

Expand All @@ -40,12 +38,13 @@ def feature_impressions(filtered_impressions, feature)
def current_impressions(feature_impressions)
feature_impressions.map do |impression|
{
keyName: impression[:i][:k],
treatment: impression[:i][:t],
time: impression[:i][:m],
bucketingKey: impression[:i][:b],
label: impression[:i][:r],
changeNumber: impression[:i][:c]
k: impression[:i][:k],
t: impression[:i][:t],
m: impression[:i][:m],
b: impression[:i][:b],
r: impression[:i][:r],
c: impression[:i][:c],
pt: impression[:i][:pt]
}
end
end
Expand Down Expand Up @@ -73,7 +72,8 @@ def impression_hash(impression)
"#{impression[:i][:k]}:" \
"#{impression[:i][:b]}:" \
"#{impression[:i][:c]}:" \
"#{impression[:i][:t]}"
"#{impression[:i][:t]}:" \
"#{impression[:i][:pt]}"
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/splitclient-rb/cache/senders/impressions_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ module SplitIoClient
module Cache
module Senders
class ImpressionsSender
def initialize(impressions_repository, api_key, config)
def initialize(impressions_repository, config, impressions_api)
@impressions_repository = impressions_repository
@api_key = api_key
@config = config
@impressions_api = impressions_api
end

def call
Expand Down Expand Up @@ -50,7 +50,7 @@ def post_impressions(fetch_all_impressions = true)
end

def impressions_api
@impressions_api ||= SplitIoClient::Api::Impressions.new(@api_key, @config)
@impressions_api
end
end
end
Expand Down
Loading