diff --git a/.circleci/config.yml b/.circleci/config.yml index f81e736b..c94d18e4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -54,7 +54,7 @@ jobs: machine: image: circleci/classic:latest environment: - - RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-1.7.22 jruby-9.0.5.0" + - RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" steps: - run: sudo apt-get -q update - run: sudo apt-get -qy install redis-server diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 5df44a25..4c6e8eeb 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -26,7 +26,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency "codeclimate-test-reporter", "~> 0" spec.add_development_dependency "redis", "~> 3.3.5" spec.add_development_dependency "connection_pool", ">= 2.1.2" - spec.add_development_dependency "moneta", "~> 1.0.0" if RUBY_VERSION >= "2.0.0" spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec_junit_formatter", "~> 0.3.0" @@ -34,6 +33,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "rake", "12.1.0" # higher versions of rake fail to install in JRuby 1.7 end + spec.add_development_dependency "timecop", "~> 0.9.1" spec.add_runtime_dependency "json", [">= 1.8", "< 3"] if RUBY_VERSION >= "2.1.0" diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index ce943d13..541cf4d7 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -2,13 +2,17 @@ require "ldclient-rb/evaluation" require "ldclient-rb/ldclient" require "ldclient-rb/cache_store" +require "ldclient-rb/expiring_cache" require "ldclient-rb/memoized_value" require "ldclient-rb/in_memory_store" require "ldclient-rb/config" require "ldclient-rb/newrelic" require "ldclient-rb/stream" require "ldclient-rb/polling" -require "ldclient-rb/event_serializer" +require "ldclient-rb/user_filter" +require "ldclient-rb/simple_lru_cache" +require "ldclient-rb/non_blocking_thread_pool" +require "ldclient-rb/event_summarizer" require "ldclient-rb/events" require "ldclient-rb/redis_store" require "ldclient-rb/requestor" diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index 5a6e7c26..3b62b2a3 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -54,7 +54,15 @@ class Config # @option opts [Boolean] :send_events (true) Whether or not to send events back to LaunchDarkly. # This differs from `offline` in that it affects only the sending of client-side events, not # streaming or polling for events from the server. - # + # @option opts [Integer] :user_keys_capacity (1000) The number of user keys that the event processor + # can remember at any one time, so that duplicate user details will not be sent in analytics events. + # @option opts [Float] :user_keys_flush_interval (300) The interval in seconds at which the event + # processor will reset its set of known user keys. + # @option opts [Boolean] :inline_users_in_events (false) Whether to include full user details in every + # analytics event. By default, events will only include the user key, except for one "index" event + # that provides the full details for the user. + # @option opts [Object] :update_processor An object that will receive feature flag data from LaunchDarkly. + # Defaults to either the streaming or the polling processor, can be customized for tests. # @return [type] [description] # rubocop:disable Metrics/AbcSize, Metrics/PerceivedComplexity def initialize(opts = {}) @@ -76,6 +84,10 @@ def initialize(opts = {}) @all_attributes_private = opts[:all_attributes_private] || false @private_attribute_names = opts[:private_attribute_names] || [] @send_events = opts.has_key?(:send_events) ? opts[:send_events] : Config.default_send_events + @user_keys_capacity = opts[:user_keys_capacity] || Config.default_user_keys_capacity + @user_keys_flush_interval = opts[:user_keys_flush_interval] || Config.default_user_keys_flush_interval + @inline_users_in_events = opts[:inline_users_in_events] || false + @update_processor = opts[:update_processor] end # @@ -186,6 +198,26 @@ def offline? # attr_reader :send_events + # + # The number of user keys that the event processor can remember at any one time, so that + # duplicate user details will not be sent in analytics events. + # + attr_reader :user_keys_capacity + + # + # The interval in seconds at which the event processor will reset its set of known user keys. + # + attr_reader :user_keys_flush_interval + + # + # Whether to include full user details in every + # analytics event. By default, events will only include the user key, except for one "index" event + # that provides the full details for the user. + # + attr_reader :inline_users_in_events + + attr_reader :update_processor + # # The default LaunchDarkly client configuration. This configuration sets # reasonable defaults for most users. @@ -264,5 +296,13 @@ def self.default_poll_interval def self.default_send_events true end + + def self.default_user_keys_capacity + 1000 + end + + def self.default_user_keys_flush_interval + 300 + end end end diff --git a/lib/ldclient-rb/evaluation.rb b/lib/ldclient-rb/evaluation.rb index 2b98c648..b4dd796c 100644 --- a/lib/ldclient-rb/evaluation.rb +++ b/lib/ldclient-rb/evaluation.rb @@ -114,7 +114,7 @@ class EvaluationError < StandardError # generated during prerequisite evaluation. Raises EvaluationError if the flag is not well-formed # Will return nil, but not raise an exception, indicating that the rules (including fallthrough) did not match # In that case, the caller should return the default value. - def evaluate(flag, user, store) + def evaluate(flag, user, store, logger) if flag.nil? raise EvaluationError, "Flag does not exist" end @@ -126,20 +126,23 @@ def evaluate(flag, user, store) events = [] if flag[:on] - res = eval_internal(flag, user, store, events) - - return { value: res, events: events } if !res.nil? + res = eval_internal(flag, user, store, events, logger) + if !res.nil? + res[:events] = events + return res + end end - if !flag[:offVariation].nil? && flag[:offVariation] < flag[:variations].length - value = flag[:variations][flag[:offVariation]] - return { value: value, events: events } + offVariation = flag[:offVariation] + if !offVariation.nil? && offVariation < flag[:variations].length + value = flag[:variations][offVariation] + return { variation: offVariation, value: value, events: events } end - { value: nil, events: events } + { variation: nil, value: nil, events: events } end - def eval_internal(flag, user, store, events) + def eval_internal(flag, user, store, events, logger) failed_prereq = false # Evaluate prerequisites, if any (flag[:prerequisites] || []).each do |prerequisite| @@ -149,14 +152,23 @@ def eval_internal(flag, user, store, events) failed_prereq = true else begin - prereq_res = eval_internal(prereq_flag, user, store, events) - variation = get_variation(prereq_flag, prerequisite[:variation]) - events.push(kind: "feature", key: prereq_flag[:key], value: prereq_res, version: prereq_flag[:version], prereqOf: flag[:key]) - if prereq_res.nil? || prereq_res != variation + prereq_res = eval_internal(prereq_flag, user, store, events, logger) + event = { + kind: "feature", + key: prereq_flag[:key], + variation: prereq_res.nil? ? nil : prereq_res[:variation], + value: prereq_res.nil? ? nil : prereq_res[:value], + version: prereq_flag[:version], + prereqOf: flag[:key], + trackEvents: prereq_flag[:trackEvents], + debugEventsUntilDate: prereq_flag[:debugEventsUntilDate] + } + events.push(event) + if prereq_res.nil? || prereq_res[:variation] != prerequisite[:variation] failed_prereq = true end rescue => exn - @config.logger.error { "[LDClient] Error evaluating prerequisite: #{exn.inspect}" } + logger.error { "[LDClient] Error evaluating prerequisite: #{exn.inspect}" } failed_prereq = true end end @@ -175,7 +187,9 @@ def eval_rules(flag, user, store) # Check user target matches (flag[:targets] || []).each do |target| (target[:values] || []).each do |value| - return get_variation(flag, target[:variation]) if value == user[:key] + if value == user[:key] + return { variation: target[:variation], value: get_variation(flag, target[:variation]) } + end end end @@ -245,7 +259,7 @@ def clause_match_user_no_segments(clause, user) def variation_for_user(rule, user, flag) if !rule[:variation].nil? # fixed variation - return get_variation(flag, rule[:variation]) + return { variation: rule[:variation], value: get_variation(flag, rule[:variation]) } elsif !rule[:rollout].nil? # percentage rollout rollout = rule[:rollout] bucket_by = rollout[:bucketBy].nil? ? "key" : rollout[:bucketBy] @@ -253,7 +267,9 @@ def variation_for_user(rule, user, flag) sum = 0; rollout[:variations].each do |variate| sum += variate[:weight].to_f / 100000.0 - return get_variation(flag, variate[:variation]) if bucket < sum + if bucket < sum + return { variation: variate[:variation], value: get_variation(flag, variate[:variation]) } + end end nil else # the rule isn't well-formed diff --git a/lib/ldclient-rb/event_summarizer.rb b/lib/ldclient-rb/event_summarizer.rb new file mode 100644 index 00000000..1c55b524 --- /dev/null +++ b/lib/ldclient-rb/event_summarizer.rb @@ -0,0 +1,52 @@ + +module LaunchDarkly + EventSummary = Struct.new(:start_date, :end_date, :counters) + + # Manages the state of summarizable information for the EventProcessor, including the + # event counters and user deduplication. Note that the methods of this class are + # deliberately not thread-safe; the EventProcessor is responsible for enforcing + # synchronization across both the summarizer and the event queue. + class EventSummarizer + def initialize + clear + end + + # Adds this event to our counters, if it is a type of event we need to count. + def summarize_event(event) + if event[:kind] == "feature" + counter_key = { + key: event[:key], + version: event[:version], + variation: event[:variation] + } + c = @counters[counter_key] + if c.nil? + @counters[counter_key] = { + value: event[:value], + default: event[:default], + count: 1 + } + else + c[:count] = c[:count] + 1 + end + time = event[:creationDate] + if !time.nil? + @start_date = time if @start_date == 0 || time < @start_date + @end_date = time if time > @end_date + end + end + end + + # Returns a snapshot of the current summarized event data, and resets this state. + def snapshot + ret = EventSummary.new(@start_date, @end_date, @counters) + ret + end + + def clear + @start_date = 0 + @end_date = 0 + @counters = {} + end + end +end diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb index 84ea0275..96db3f46 100644 --- a/lib/ldclient-rb/events.rb +++ b/lib/ldclient-rb/events.rb @@ -1,94 +1,426 @@ +require "concurrent" require "concurrent/atomics" +require "concurrent/executors" require "thread" +require "time" require "faraday" module LaunchDarkly + MAX_FLUSH_WORKERS = 5 + CURRENT_SCHEMA_VERSION = 3 + + class NullEventProcessor + def add_event(event) + end + + def flush + end + + def stop + end + end + + class EventMessage + def initialize(event) + @event = event + end + attr_reader :event + end + + class FlushMessage + end + + class FlushUsersMessage + end + + class SynchronousMessage + def initialize + @reply = Concurrent::Semaphore.new(0) + end + + def completed + @reply.release + end + + def wait_for_completion + @reply.acquire + end + end + + class TestSyncMessage < SynchronousMessage + end + + class StopMessage < SynchronousMessage + end + class EventProcessor - def initialize(sdk_key, config) + def initialize(sdk_key, config, client = nil) @queue = Queue.new - @sdk_key = sdk_key - @config = config - @serializer = EventSerializer.new(config) - @client = Faraday.new + @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do + @queue << FlushMessage.new + end + @flush_task.execute + @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do + @queue << FlushUsersMessage.new + end + @users_flush_task.execute @stopped = Concurrent::AtomicBoolean.new(false) - @worker = create_worker if @config.send_events + + EventDispatcher.new(@queue, sdk_key, config, client) + end + + def add_event(event) + event[:creationDate] = (Time.now.to_f * 1000).to_i + @queue << EventMessage.new(event) end - def alive? - !@stopped.value + def flush + # flush is done asynchronously + @queue << FlushMessage.new end def stop + # final shutdown, which includes a final flush, is done synchronously if @stopped.make_true - # There seems to be no such thing as "close" in Faraday: https://github.com/lostisland/faraday/issues/241 - if !@worker.nil? && @worker.alive? - @worker.raise "shutting down client" - end + @flush_task.shutdown + @users_flush_task.shutdown + @queue << FlushMessage.new + stop_msg = StopMessage.new + @queue << stop_msg + stop_msg.wait_for_completion end end - def create_worker - Thread.new do - while !@stopped.value do - begin - flush - sleep(@config.flush_interval) - rescue StandardError => exn - log_exception(__method__.to_s, exn) + # exposed only for testing + def wait_until_inactive + sync_msg = TestSyncMessage.new + @queue << sync_msg + sync_msg.wait_for_completion + end + end + + class EventDispatcher + def initialize(queue, sdk_key, config, client) + @sdk_key = sdk_key + @config = config + @client = client ? client : Faraday.new + @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity) + @formatter = EventOutputFormatter.new(config) + @disabled = Concurrent::AtomicBoolean.new(false) + @last_known_past_time = Concurrent::AtomicFixnum.new(0) + + buffer = EventBuffer.new(config.capacity, config.logger) + flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS) + + Thread.new { main_loop(queue, buffer, flush_workers) } + end + + private + + def now_millis() + (Time.now.to_f * 1000).to_i + end + + def main_loop(queue, buffer, flush_workers) + running = true + while running do + begin + message = queue.pop + case message + when EventMessage + dispatch_event(message.event, buffer) + when FlushMessage + trigger_flush(buffer, flush_workers) + when FlushUsersMessage + @user_keys.clear + when TestSyncMessage + synchronize_for_testing(flush_workers) + message.completed + when StopMessage + do_shutdown(flush_workers) + running = false + message.completed end + rescue => e + @config.logger.warn { "[LDClient] Unexpected error in event processor: #{e.inspect}. \nTrace: #{e.backtrace}" } end end end - def post_flushed_events(events) - res = @client.post (@config.events_uri + "/bulk") do |req| - req.headers["Authorization"] = @sdk_key - req.headers["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION - req.headers["Content-Type"] = "application/json" - req.body = @serializer.serialize_events(events) - req.options.timeout = @config.read_timeout - req.options.open_timeout = @config.connect_timeout + def do_shutdown(flush_workers) + flush_workers.shutdown + flush_workers.wait_for_termination + # There seems to be no such thing as "close" in Faraday: https://github.com/lostisland/faraday/issues/241 + end + + def synchronize_for_testing(flush_workers) + # Used only by unit tests. Wait until all active flush workers have finished. + flush_workers.wait_all + end + + def dispatch_event(event, buffer) + return if @disabled.value + + # Always record the event in the summary. + buffer.add_to_summary(event) + + # Decide whether to add the event to the payload. Feature events may be added twice, once for + # the event (if tracked) and once for debugging. + will_add_full_event = false + debug_event = nil + if event[:kind] == "feature" + will_add_full_event = event[:trackEvents] + if should_debug_event(event) + debug_event = event.clone + debug_event[:debug] = true + end + else + will_add_full_event = true end - if res.status < 200 || res.status >= 300 - @config.logger.error { "[LDClient] Unexpected status code while processing events: #{res.status}" } - if res.status == 401 - @config.logger.error { "[LDClient] Received 401 error, no further events will be posted since SDK key is invalid" } - stop + + # For each user we haven't seen before, we add an index event - unless this is already + # an identify event for that user. + if !(will_add_full_event && @config.inline_users_in_events) + if event.has_key?(:user) && !notice_user(event[:user]) && event[:kind] != "identify" + buffer.add_event({ + kind: "index", + creationDate: event[:creationDate], + user: event[:user] + }) end end + + buffer.add_event(event) if will_add_full_event + buffer.add_event(debug_event) if !debug_event.nil? end - def flush - return if @offline || !@config.send_events - events = [] - begin - loop do - events << @queue.pop(true) + # Add to the set of users we've noticed, and return true if the user was already known to us. + def notice_user(user) + if user.nil? || !user.has_key?(:key) + true + else + @user_keys.add(user[:key]) + end + end + + def should_debug_event(event) + debug_until = event[:debugEventsUntilDate] + if !debug_until.nil? + last_past = @last_known_past_time.value + debug_until > last_past && debug_until > now_millis + else + false + end + end + + def trigger_flush(buffer, flush_workers) + if @disabled.value + return + end + + payload = buffer.get_payload + if !payload.events.empty? || !payload.summary.counters.empty? + # If all available worker threads are busy, success will be false and no job will be queued. + success = flush_workers.post do + resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) + handle_response(resp) if !resp.nil? end - rescue ThreadError + buffer.clear if success # Reset our internal state, these events now belong to the flush worker end + end - if !events.empty? && !@stopped.value - post_flushed_events(events) + def handle_response(res) + if res.status == 401 + @config.logger.error { "[LDClient] Received 401 error, no further events will be posted since SDK key is invalid" } + @disabled.value = true + else + if !res.headers.nil? && res.headers.has_key?("Date") + begin + res_time = (Time.httpdate(res.headers["Date"]).to_f * 1000).to_i + @last_known_past_time.value = res_time + rescue ArgumentError + end + end end end + end + + FlushPayload = Struct.new(:events, :summary) + + class EventBuffer + def initialize(capacity, logger) + @capacity = capacity + @logger = logger + @capacity_exceeded = false + @events = [] + @summarizer = EventSummarizer.new + end def add_event(event) - return if @offline || !@config.send_events || @stopped.value + if @events.length < @capacity + @logger.debug { "[LDClient] Enqueueing event: #{event.to_json}" } + @events.push(event) + @capacity_exceeded = false + else + if !@capacity_exceeded + @capacity_exceeded = true + @logger.warn { "[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events." } + end + end + end + + def add_to_summary(event) + @summarizer.summarize_event(event) + end - if @queue.length < @config.capacity - event[:creationDate] = (Time.now.to_f * 1000).to_i - @config.logger.debug { "[LDClient] Enqueueing event: #{event.to_json}" } - @queue.push(event) + def get_payload + return FlushPayload.new(@events, @summarizer.snapshot) + end + + def clear + @events = [] + @summarizer.clear + end + end - if !@worker.alive? - @worker = create_worker + class EventPayloadSendTask + def run(sdk_key, config, client, payload, formatter) + events_out = formatter.make_output_events(payload.events, payload.summary) + res = nil + body = events_out.to_json + (0..1).each do |attempt| + if attempt > 0 + config.logger.warn { "[LDClient] Will retry posting events after 1 second" } + sleep(1) end + begin + config.logger.debug { "[LDClient] sending #{events_out.length} events: #{body}" } + res = client.post (config.events_uri + "/bulk") do |req| + req.headers["Authorization"] = sdk_key + req.headers["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION + req.headers["Content-Type"] = "application/json" + req.headers["X-LaunchDarkly-Event-Schema"] = CURRENT_SCHEMA_VERSION.to_s + req.body = body + req.options.timeout = config.read_timeout + req.options.open_timeout = config.connect_timeout + end + rescue StandardError => exn + config.logger.warn { "[LDClient] Error flushing events: #{exn.inspect}." } + next + end + if res.status < 200 || res.status >= 300 + config.logger.error { "[LDClient] Unexpected status code while processing events: #{res.status}" } + if res.status >= 500 + next + end + end + break + end + # used up our retries, return the last response if any + res + end + end + + class EventOutputFormatter + def initialize(config) + @inline_users = config.inline_users_in_events + @user_filter = UserFilter.new(config) + end + + # Transforms events into the format used for event sending. + def make_output_events(events, summary) + events_out = events.map { |e| make_output_event(e) } + if !summary.counters.empty? + events_out.push(make_summary_event(summary)) + end + events_out + end + + private + + def make_output_event(event) + case event[:kind] + when "feature" + is_debug = event[:debug] + out = { + kind: is_debug ? "debug" : "feature", + creationDate: event[:creationDate], + key: event[:key], + value: event[:value] + } + out[:default] = event[:default] if event.has_key?(:default) + out[:variation] = event[:variation] if event.has_key?(:variation) + out[:version] = event[:version] if event.has_key?(:version) + out[:prereqOf] = event[:prereqOf] if event.has_key?(:prereqOf) + if @inline_users || is_debug + out[:user] = @user_filter.transform_user_props(event[:user]) + else + out[:userKey] = event[:user].nil? ? nil : event[:user][:key] + end + out + when "identify" + { + kind: "identify", + creationDate: event[:creationDate], + key: event[:user].nil? ? nil : event[:user][:key], + user: @user_filter.transform_user_props(event[:user]) + } + when "custom" + out = { + kind: "custom", + creationDate: event[:creationDate], + key: event[:key] + } + out[:data] = event[:data] if event.has_key?(:data) + if @inline_users + out[:user] = @user_filter.transform_user_props(event[:user]) + else + out[:userKey] = event[:user].nil? ? nil : event[:user][:key] + end + out + when "index" + { + kind: "index", + creationDate: event[:creationDate], + user: @user_filter.transform_user_props(event[:user]) + } else - @config.logger.warn { "[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events." } + event end end - private :create_worker, :post_flushed_events + # Transforms the summary data into the format used for event sending. + def make_summary_event(summary) + flags = {} + summary[:counters].each { |ckey, cval| + flag = flags[ckey[:key]] + if flag.nil? + flag = { + default: cval[:default], + counters: [] + } + flags[ckey[:key]] = flag + end + c = { + value: cval[:value], + count: cval[:count] + } + if !ckey[:variation].nil? + c[:variation] = ckey[:variation] + end + if ckey[:version].nil? + c[:unknown] = true + else + c[:version] = ckey[:version] + end + flag[:counters].push(c) + } + { + kind: "summary", + startDate: summary[:start_date], + endDate: summary[:end_date], + features: flags + } + end end end diff --git a/lib/ldclient-rb/expiring_cache.rb b/lib/ldclient-rb/expiring_cache.rb new file mode 100644 index 00000000..6d8c48f8 --- /dev/null +++ b/lib/ldclient-rb/expiring_cache.rb @@ -0,0 +1,76 @@ + +module LaunchDarkly + # A thread-safe cache with maximum number of entries and TTL. + # Adapted from https://github.com/SamSaffron/lru_redux/blob/master/lib/lru_redux/ttl/cache.rb + # under MIT license with the following changes: + # * made thread-safe + # * removed many unused methods + # * reading a key does not reset its expiration time, only writing + class ExpiringCache + def initialize(max_size, ttl) + @max_size = max_size + @ttl = ttl + @data_lru = {} + @data_ttl = {} + @lock = Mutex.new + end + + def [](key) + @lock.synchronize do + ttl_evict + @data_lru[key] + end + end + + def []=(key, val) + @lock.synchronize do + ttl_evict + + @data_lru.delete(key) + @data_ttl.delete(key) + + @data_lru[key] = val + @data_ttl[key] = Time.now.to_f + + if @data_lru.size > @max_size + key, _ = @data_lru.first # hashes have a FIFO ordering in Ruby + + @data_ttl.delete(key) + @data_lru.delete(key) + end + + val + end + end + + def delete(key) + @lock.synchronize do + ttl_evict + + @data_lru.delete(key) + @data_ttl.delete(key) + end + end + + def clear + @lock.synchronize do + @data_lru.clear + @data_ttl.clear + end + end + + private + + def ttl_evict + ttl_horizon = Time.now.to_f - @ttl + key, time = @data_ttl.first + + until time.nil? || time > ttl_horizon + @data_ttl.delete(key) + @data_lru.delete(key) + + key, time = @data_ttl.first + end + end + end +end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index 2c91bd60..ece7c4ec 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -28,7 +28,11 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) @config = config @store = config.feature_store - @event_processor = EventProcessor.new(sdk_key, config) + if @config.offline? || !@config.send_events + @event_processor = NullEventProcessor.new + else + @event_processor = EventProcessor.new(sdk_key, config) + end if @config.use_ldd? @config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" } @@ -38,12 +42,16 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) requestor = Requestor.new(sdk_key, config) if !@config.offline? - if @config.stream? - @update_processor = StreamProcessor.new(sdk_key, config, requestor) + if @config.update_processor.nil? + if @config.stream? + @update_processor = StreamProcessor.new(sdk_key, config, requestor) + else + @config.logger.info { "Disabling streaming API" } + @config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } + @update_processor = PollingProcessor.new(config, requestor) + end else - @config.logger.info { "Disabling streaming API" } - @config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } - @update_processor = PollingProcessor.new(config, requestor) + @update_processor = @config.update_processor end @update_processor.start end @@ -113,12 +121,6 @@ def initialized? def variation(key, user, default) return default if @config.offline? - unless user - @config.logger.error { "[LDClient] Must specify user" } - @event_processor.add_event(kind: "feature", key: key, value: default, default: default, user: user) - return default - end - if !initialized? if @store.initialized? @config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" } @@ -129,7 +131,7 @@ def variation(key, user, default) end end - sanitize_user(user) + sanitize_user(user) if !user.nil? feature = @store.get(FEATURES, key) if feature.nil? @@ -138,24 +140,29 @@ def variation(key, user, default) return default end + unless user + @config.logger.error { "[LDClient] Must specify user" } + @event_processor.add_event(make_feature_event(feature, user, nil, default, default)) + return default + end + begin - res = evaluate(feature, user, @store) + res = evaluate(feature, user, @store, @config.logger) if !res[:events].nil? res[:events].each do |event| @event_processor.add_event(event) end end - if !res[:value].nil? - @event_processor.add_event(kind: "feature", key: key, user: user, value: res[:value], default: default, version: feature[:version]) - return res[:value] - else + value = res[:value] + if value.nil? @config.logger.debug { "[LDClient] Result value is null in toggle" } - @event_processor.add_event(kind: "feature", key: key, user: user, value: default, default: default, version: feature[:version]) - return default + value = default end + @event_processor.add_event(make_feature_event(feature, user, res[:variation], value, default)) + return value rescue => exn @config.logger.warn { "[LDClient] Error evaluating feature flag: #{exn.inspect}. \nTrace: #{exn.backtrace}" } - @event_processor.add_event(kind: "feature", key: key, user: user, value: default, default: default, version: feature[:version]) + @event_processor.add_event(make_feature_event(feature, user, nil, default, default)) return default end end @@ -200,7 +207,7 @@ def all_flags(user) features = @store.all(FEATURES) # TODO rescue if necessary - Hash[features.map{ |k, f| [k, evaluate(f, user, @store)[:value]] }] + Hash[features.map{ |k, f| [k, evaluate(f, user, @store, @config.logger)[:value]] }] rescue => exn @config.logger.warn { "[LDClient] Error evaluating all flags: #{exn.inspect}. \nTrace: #{exn.backtrace}" } return Hash.new @@ -232,6 +239,20 @@ def sanitize_user(user) end end - private :evaluate, :log_exception, :sanitize_user + def make_feature_event(flag, user, variation, value, default) + { + kind: "feature", + key: flag[:key], + user: user, + variation: variation, + value: value, + default: default, + version: flag[:version], + trackEvents: flag[:trackEvents], + debugEventsUntilDate: flag[:debugEventsUntilDate] + } + end + + private :evaluate, :log_exception, :sanitize_user, :make_feature_event end end diff --git a/lib/ldclient-rb/non_blocking_thread_pool.rb b/lib/ldclient-rb/non_blocking_thread_pool.rb new file mode 100644 index 00000000..81b7ea14 --- /dev/null +++ b/lib/ldclient-rb/non_blocking_thread_pool.rb @@ -0,0 +1,46 @@ +require "concurrent" +require "concurrent/atomics" +require "concurrent/executors" +require "thread" + +# Simple wrapper for a FixedThreadPool that rejects new jobs if all the threads are busy, rather +# than blocking. Also provides a way to wait for all jobs to finish without shutting down. + +module LaunchDarkly + class NonBlockingThreadPool + def initialize(capacity) + @capacity = capacity + @pool = Concurrent::FixedThreadPool.new(capacity) + @semaphore = Concurrent::Semaphore.new(capacity) + end + + # Attempts to submit a job, but only if a worker is available. Unlike the regular post method, + # this returns a value: true if the job was submitted, false if all workers are busy. + def post + if !@semaphore.try_acquire(1) + return + end + @pool.post do + begin + yield + ensure + @semaphore.release(1) + end + end + end + + # Waits until no jobs are executing, without shutting down the pool. + def wait_all + @semaphore.acquire(@capacity) + @semaphore.release(@capacity) + end + + def shutdown + @pool.shutdown + end + + def wait_for_termination + @pool.wait_for_termination + end + end +end diff --git a/lib/ldclient-rb/redis_store.rb b/lib/ldclient-rb/redis_store.rb index 0bbe8990..3729ca6b 100644 --- a/lib/ldclient-rb/redis_store.rb +++ b/lib/ldclient-rb/redis_store.rb @@ -9,15 +9,14 @@ module LaunchDarkly # streaming API. Feature data can also be further cached in memory to reduce overhead # of calls to Redis. # - # To use this class, you must first have the `redis`, `connection-pool`, and `moneta` - # gems installed. Then, create an instance and store it in the `feature_store` - # property of your client configuration. + # To use this class, you must first have the `redis` and `connection-pool` gems + # installed. Then, create an instance and store it in the `feature_store` property + # of your client configuration. # class RedisFeatureStore begin require "redis" require "connection_pool" - require "moneta" REDIS_ENABLED = true rescue ScriptError, StandardError REDIS_ENABLED = false @@ -38,7 +37,7 @@ class RedisFeatureStore # def initialize(opts = {}) if !REDIS_ENABLED - raise RuntimeError.new("can't use RedisFeatureStore because one of these gems is missing: redis, connection_pool, moneta") + raise RuntimeError.new("can't use RedisFeatureStore because one of these gems is missing: redis, connection_pool") end @redis_opts = opts[:redis_opts] || Hash.new if opts[:redis_url] @@ -54,15 +53,12 @@ def initialize(opts = {}) @prefix = opts[:prefix] || RedisFeatureStore.default_prefix @logger = opts[:logger] || Config.default_logger - @expiration_seconds = opts[:expiration] || 15 - @capacity = opts[:capacity] || 1000 - # We're using Moneta only to provide expiration behavior for the in-memory cache. - # Moneta can also be used as a wrapper for Redis, but it doesn't support the Redis - # hash operations that we use. - if @expiration_seconds > 0 - @cache = Moneta.new(:LRUHash, expires: true, threadsafe: true, max_count: @capacity) + expiration_seconds = opts[:expiration] || 15 + capacity = opts[:capacity] || 1000 + if expiration_seconds > 0 + @cache = ExpiringCache.new(capacity, expiration_seconds) else - @cache = Moneta.new(:Null) # a stub that caches nothing + @cache = nil end @stopped = Concurrent::AtomicBoolean.new(false) @@ -92,7 +88,7 @@ def self.default_prefix end def get(kind, key) - f = @cache[cache_key(kind, key)] + f = @cache.nil? ? nil : @cache[cache_key(kind, key)] if f.nil? @logger.debug { "RedisFeatureStore: no cache hit for #{key} in '#{kind[:namespace]}', requesting from Redis" } f = with_connection do |redis| @@ -139,7 +135,7 @@ def delete(kind, key, version) end def init(all_data) - @cache.clear + @cache.clear if !@cache.nil? count = 0 with_connection do |redis| all_data.each do |kind, items| @@ -174,7 +170,7 @@ def initialized? def stop if @stopped.make_true @pool.shutdown { |redis| redis.close } - @cache.clear + @cache.clear if !@cache.nil? end end @@ -213,7 +209,7 @@ def get_redis(kind, redis, key) end def put_cache(kind, key, value) - @cache.store(cache_key(kind, key), value, expires: @expiration_seconds) + @cache[cache_key(kind, key)] = value if !@cache.nil? end def update_with_versioning(kind, new_item) diff --git a/lib/ldclient-rb/simple_lru_cache.rb b/lib/ldclient-rb/simple_lru_cache.rb new file mode 100644 index 00000000..64b1a709 --- /dev/null +++ b/lib/ldclient-rb/simple_lru_cache.rb @@ -0,0 +1,24 @@ + +module LaunchDarkly + # A non-thread-safe implementation of a LRU cache set with only add and reset methods. + # Based on https://github.com/SamSaffron/lru_redux/blob/master/lib/lru_redux/cache.rb + class SimpleLRUCacheSet + def initialize(capacity) + @values = {} + @capacity = capacity + end + + # Adds a value to the cache or marks it recent if it was already there. Returns true if already there. + def add(value) + found = true + @values.delete(value) { found = false } + @values[value] = true + @values.shift if @values.length > @capacity + found + end + + def clear + @values = {} + end + end +end diff --git a/lib/ldclient-rb/event_serializer.rb b/lib/ldclient-rb/user_filter.rb similarity index 87% rename from lib/ldclient-rb/event_serializer.rb rename to lib/ldclient-rb/user_filter.rb index bc8cb27d..9f4bce82 100644 --- a/lib/ldclient-rb/event_serializer.rb +++ b/lib/ldclient-rb/user_filter.rb @@ -1,18 +1,28 @@ require "json" module LaunchDarkly - class EventSerializer + class UserFilter def initialize(config) @all_attributes_private = config.all_attributes_private @private_attribute_names = Set.new(config.private_attribute_names.map(&:to_sym)) end - def serialize_events(events) - events.map { |event| - Hash[event.map { |key, value| - [key, (key.to_sym == :user) ? transform_user_props(value) : value] - }] - }.to_json + def transform_user_props(user_props) + return nil if user_props.nil? + + user_private_attrs = Set.new((user_props[:privateAttributeNames] || []).map(&:to_sym)) + + filtered_user_props, removed = filter_values(user_props, user_private_attrs, ALLOWED_TOP_LEVEL_KEYS, IGNORED_TOP_LEVEL_KEYS) + if user_props.has_key?(:custom) + filtered_user_props[:custom], removed_custom = filter_values(user_props[:custom], user_private_attrs) + removed.merge(removed_custom) + end + + unless removed.empty? + # note, :privateAttributeNames is what the developer sets; :privateAttrs is what we send to the server + filtered_user_props[:privateAttrs] = removed.to_a.sort.map { |s| s.to_s } + end + return filtered_user_props end private @@ -35,21 +45,5 @@ def filter_values(props, user_private_attrs, allowed_keys = [], keys_to_leave_as def private_attr?(name, user_private_attrs) @all_attributes_private || @private_attribute_names.include?(name) || user_private_attrs.include?(name) end - - def transform_user_props(user_props) - user_private_attrs = Set.new((user_props[:privateAttributeNames] || []).map(&:to_sym)) - - filtered_user_props, removed = filter_values(user_props, user_private_attrs, ALLOWED_TOP_LEVEL_KEYS, IGNORED_TOP_LEVEL_KEYS) - if user_props.has_key?(:custom) - filtered_user_props[:custom], removed_custom = filter_values(user_props[:custom], user_private_attrs) - removed.merge(removed_custom) - end - - unless removed.empty? - # note, :privateAttributeNames is what the developer sets; :privateAttrs is what we send to the server - filtered_user_props[:privateAttrs] = removed.to_a.sort - end - return filtered_user_props - end end end diff --git a/spec/evaluation_spec.rb b/spec/evaluation_spec.rb index b8f4ea59..a8d980ae 100644 --- a/spec/evaluation_spec.rb +++ b/spec/evaluation_spec.rb @@ -12,6 +12,8 @@ } } + let(:logger) { LaunchDarkly::Config.default_logger } + include LaunchDarkly::Evaluation describe "evaluate" do @@ -24,7 +26,7 @@ variations: ['a', 'b', 'c'] } user = { key: 'x' } - expect(evaluate(flag, user, features)).to eq({value: 'b', events: []}) + expect(evaluate(flag, user, features, logger)).to eq({variation: 1, value: 'b', events: []}) end it "returns nil if flag is off and off variation is unspecified" do @@ -35,7 +37,7 @@ variations: ['a', 'b', 'c'] } user = { key: 'x' } - expect(evaluate(flag, user, features)).to eq({value: nil, events: []}) + expect(evaluate(flag, user, features, logger)).to eq({variation: nil, value: nil, events: []}) end it "returns off variation if prerequisite is not found" do @@ -48,7 +50,34 @@ variations: ['a', 'b', 'c'] } user = { key: 'x' } - expect(evaluate(flag, user, features)).to eq({value: 'b', events: []}) + expect(evaluate(flag, user, features, logger)).to eq({variation: 1, value: 'b', events: []}) + end + + it "returns off variation and event if prerequisite of a prerequisite is not found" do + flag = { + key: 'feature0', + on: true, + prerequisites: [{key: 'feature1', variation: 1}], + fallthrough: { variation: 0 }, + offVariation: 1, + variations: ['a', 'b', 'c'], + version: 1 + } + flag1 = { + key: 'feature1', + on: true, + prerequisites: [{key: 'feature2', variation: 1}], # feature2 doesn't exist + fallthrough: { variation: 0 }, + variations: ['d', 'e'], + version: 2 + } + features.upsert(LaunchDarkly::FEATURES, flag1) + user = { key: 'x' } + events_should_be = [{ + kind: 'feature', key: 'feature1', variation: nil, value: nil, version: 2, prereqOf: 'feature0', + trackEvents: nil, debugEventsUntilDate: nil + }] + expect(evaluate(flag, user, features, logger)).to eq({variation: 1, value: 'b', events: events_should_be}) end it "returns off variation and event if prerequisite is not met" do @@ -70,8 +99,11 @@ } features.upsert(LaunchDarkly::FEATURES, flag1) user = { key: 'x' } - events_should_be = [{kind: 'feature', key: 'feature1', value: 'd', version: 2, prereqOf: 'feature0'}] - expect(evaluate(flag, user, features)).to eq({value: 'b', events: events_should_be}) + events_should_be = [{ + kind: 'feature', key: 'feature1', variation: 0, value: 'd', version: 2, prereqOf: 'feature0', + trackEvents: nil, debugEventsUntilDate: nil + }] + expect(evaluate(flag, user, features, logger)).to eq({variation: 1, value: 'b', events: events_should_be}) end it "returns fallthrough variation and event if prerequisite is met and there are no rules" do @@ -93,8 +125,11 @@ } features.upsert(LaunchDarkly::FEATURES, flag1) user = { key: 'x' } - events_should_be = [{kind: 'feature', key: 'feature1', value: 'e', version: 2, prereqOf: 'feature0'}] - expect(evaluate(flag, user, features)).to eq({value: 'a', events: events_should_be}) + events_should_be = [{ + kind: 'feature', key: 'feature1', variation: 1, value: 'e', version: 2, prereqOf: 'feature0', + trackEvents: nil, debugEventsUntilDate: nil + }] + expect(evaluate(flag, user, features, logger)).to eq({variation: 0, value: 'a', events: events_should_be}) end it "matches user from targets" do @@ -109,7 +144,7 @@ variations: ['a', 'b', 'c'] } user = { key: 'userkey' } - expect(evaluate(flag, user, features)).to eq({value: 'c', events: []}) + expect(evaluate(flag, user, features, logger)).to eq({variation: 2, value: 'c', events: []}) end it "matches user from rules" do @@ -133,7 +168,7 @@ variations: ['a', 'b', 'c'] } user = { key: 'userkey' } - expect(evaluate(flag, user, features)).to eq({value: 'c', events: []}) + expect(evaluate(flag, user, features, logger)).to eq({variation: 2, value: 'c', events: []}) end end diff --git a/spec/event_summarizer_spec.rb b/spec/event_summarizer_spec.rb new file mode 100644 index 00000000..5449e691 --- /dev/null +++ b/spec/event_summarizer_spec.rb @@ -0,0 +1,63 @@ +require "spec_helper" + +describe LaunchDarkly::EventSummarizer do + subject { LaunchDarkly::EventSummarizer } + + let(:user) { { key: "key" } } + + it "does not add identify event to summary" do + es = subject.new + snapshot = es.snapshot + es.summarize_event({ kind: "identify", user: user }) + + expect(es.snapshot).to eq snapshot + end + + it "does not add custom event to summary" do + es = subject.new + snapshot = es.snapshot + es.summarize_event({ kind: "custom", key: "whatever", user: user }) + + expect(es.snapshot).to eq snapshot + end + + it "tracks start and end dates" do + es = subject.new + flag = { key: "key" } + event1 = { kind: "feature", creationDate: 2000, user: user } + event2 = { kind: "feature", creationDate: 1000, user: user } + event3 = { kind: "feature", creationDate: 1500, user: user } + es.summarize_event(event1) + es.summarize_event(event2) + es.summarize_event(event3) + data = es.snapshot + + expect(data.start_date).to be 1000 + expect(data.end_date).to be 2000 + end + + it "counts events" do + es = subject.new + flag1 = { key: "key1", version: 11 } + flag2 = { key: "key2", version: 22 } + event1 = { kind: "feature", key: "key1", version: 11, user: user, variation: 1, value: "value1", default: "default1" } + event2 = { kind: "feature", key: "key1", version: 11, user: user, variation: 2, value: "value2", default: "default1" } + event3 = { kind: "feature", key: "key2", version: 22, user: user, variation: 1, value: "value99", default: "default2" } + event4 = { kind: "feature", key: "key1", version: 11, user: user, variation: 1, value: "value1", default: "default1" } + event5 = { kind: "feature", key: "badkey", user: user, variation: nil, value: "default3", default: "default3" } + [event1, event2, event3, event4, event5].each { |e| es.summarize_event(e) } + data = es.snapshot + + expectedCounters = { + { key: "key1", version: 11, variation: 1 } => + { count: 2, value: "value1", default: "default1" }, + { key: "key1", version: 11, variation: 2 } => + { count: 1, value: "value2", default: "default1" }, + { key: "key2", version: 22, variation: 1 } => + { count: 1, value: "value99", default: "default2" }, + { key: "badkey", version: nil, variation: nil } => + { count: 1, value: "default3", default: "default3" } + } + expect(data.counters).to eq expectedCounters + end +end diff --git a/spec/events_spec.rb b/spec/events_spec.rb new file mode 100644 index 00000000..cbce1fbe --- /dev/null +++ b/spec/events_spec.rb @@ -0,0 +1,506 @@ +require "spec_helper" +require "faraday" +require "time" + +describe LaunchDarkly::EventProcessor do + subject { LaunchDarkly::EventProcessor } + + let(:default_config) { LaunchDarkly::Config.new } + let(:hc) { FakeHttpClient.new } + let(:user) { { key: "userkey", name: "Red" } } + let(:filtered_user) { { key: "userkey", privateAttrs: [ "name" ] } } + + after(:each) do + if !@ep.nil? + @ep.stop + end + end + + it "queues identify event" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "identify", key: user[:key], user: user } + @ep.add_event(e) + + output = flush_and_get_events + expect(output).to contain_exactly(e) + end + + it "filters user in identify event" do + config = LaunchDarkly::Config.new(all_attributes_private: true) + @ep = subject.new("sdk_key", config, hc) + e = { kind: "identify", key: user[:key], user: user } + @ep.add_event(e) + + output = flush_and_get_events + expect(output).to contain_exactly({ + kind: "identify", + key: user[:key], + creationDate: e[:creationDate], + user: filtered_user + }) + end + + it "queues individual feature event with index event" do + @ep = subject.new("sdk_key", default_config, hc) + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, user)), + eq(feature_event(fe, flag, false, nil)), + include(:kind => "summary") + ) + end + + it "filters user in index event" do + config = LaunchDarkly::Config.new(all_attributes_private: true) + @ep = subject.new("sdk_key", config, hc) + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, filtered_user)), + eq(feature_event(fe, flag, false, nil)), + include(:kind => "summary") + ) + end + + it "can include inline user in feature event" do + config = LaunchDarkly::Config.new(inline_users_in_events: true) + @ep = subject.new("sdk_key", config, hc) + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(feature_event(fe, flag, false, user)), + include(:kind => "summary") + ) + end + + it "filters user in feature event" do + config = LaunchDarkly::Config.new(all_attributes_private: true, inline_users_in_events: true) + @ep = subject.new("sdk_key", config, hc) + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(feature_event(fe, flag, false, filtered_user)), + include(:kind => "summary") + ) + end + + it "still generates index event if inline_users is true but feature event was not tracked" do + config = LaunchDarkly::Config.new(inline_users_in_events: true) + @ep = subject.new("sdk_key", config, hc) + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, user)), + include(:kind => "summary") + ) + end + + it "sets event kind to debug if flag is temporarily in debug mode" do + @ep = subject.new("sdk_key", default_config, hc) + flag = { key: "flagkey", version: 11 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: future_time + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, user)), + eq(feature_event(fe, flag, true, user)), + include(:kind => "summary") + ) + end + + it "can be both debugging and tracking an event" do + @ep = subject.new("sdk_key", default_config, hc) + flag = { key: "flagkey", version: 11 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true, debugEventsUntilDate: future_time + } + @ep.add_event(fe) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, user)), + eq(feature_event(fe, flag, false, nil)), + eq(feature_event(fe, flag, true, user)), + include(:kind => "summary") + ) + end + + it "ends debug mode based on client time if client time is later than server time" do + @ep = subject.new("sdk_key", default_config, hc) + + # Pick a server time that is somewhat behind the client time + server_time = (Time.now.to_f * 1000).to_i - 20000 + + # Send and flush an event we don't care about, just to set the last server time + hc.set_server_time(server_time) + @ep.add_event({ kind: "identify", user: { key: "otherUser" }}) + flush_and_get_events + + # Now send an event with debug mode on, with a "debug until" time that is further in + # the future than the server time, but in the past compared to the client. + flag = { key: "flagkey", version: 11 } + debug_until = server_time + 1000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: debug_until + } + @ep.add_event(fe) + + # Should get a summary event only, not a full feature event + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, user)), + include(:kind => "summary") + ) + end + + it "ends debug mode based on server time if server time is later than client time" do + @ep = subject.new("sdk_key", default_config, hc) + + # Pick a server time that is somewhat ahead of the client time + server_time = (Time.now.to_f * 1000).to_i + 20000 + + # Send and flush an event we don't care about, just to set the last server time + hc.set_server_time(server_time) + @ep.add_event({ kind: "identify", user: { key: "otherUser" }}) + flush_and_get_events + + # Now send an event with debug mode on, with a "debug until" time that is further in + # the future than the server time, but in the past compared to the client. + flag = { key: "flagkey", version: 11 } + debug_until = server_time - 1000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: debug_until + } + @ep.add_event(fe) + + # Should get a summary event only, not a full feature event + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe, user)), + include(:kind => "summary") + ) + end + + it "generates only one index event for multiple events with same user" do + @ep = subject.new("sdk_key", default_config, hc) + flag1 = { key: "flagkey1", version: 11 } + flag2 = { key: "flagkey2", version: 22 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe1 = { + kind: "feature", key: "flagkey1", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + fe2 = { + kind: "feature", key: "flagkey2", version: 22, user: user, + variation: 1, value: "value", trackEvents: true + } + @ep.add_event(fe1) + @ep.add_event(fe2) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe1, user)), + eq(feature_event(fe1, flag1, false, nil)), + eq(feature_event(fe2, flag2, false, nil)), + include(:kind => "summary") + ) + end + + it "summarizes non-tracked events" do + @ep = subject.new("sdk_key", default_config, hc) + flag1 = { key: "flagkey1", version: 11 } + flag2 = { key: "flagkey2", version: 22 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe1 = { + kind: "feature", key: "flagkey1", version: 11, user: user, + variation: 1, value: "value1", default: "default1" + } + fe2 = { + kind: "feature", key: "flagkey2", version: 22, user: user, + variation: 2, value: "value2", default: "default2" + } + @ep.add_event(fe1) + @ep.add_event(fe2) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(fe1, user)), + eq({ + kind: "summary", + startDate: fe1[:creationDate], + endDate: fe2[:creationDate], + features: { + flagkey1: { + default: "default1", + counters: [ + { version: 11, variation: 1, value: "value1", count: 1 } + ] + }, + flagkey2: { + default: "default2", + counters: [ + { version: 22, variation: 2, value: "value2", count: 1 } + ] + } + } + }) + ) + end + + it "queues custom event with user" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } + @ep.add_event(e) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(index_event(e, user)), + eq(custom_event(e, nil)) + ) + end + + it "can include inline user in custom event" do + config = LaunchDarkly::Config.new(inline_users_in_events: true) + @ep = subject.new("sdk_key", config, hc) + e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } + @ep.add_event(e) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(custom_event(e, user)) + ) + end + + it "filters user in custom event" do + config = LaunchDarkly::Config.new(all_attributes_private: true, inline_users_in_events: true) + @ep = subject.new("sdk_key", config, hc) + e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } + @ep.add_event(e) + + output = flush_and_get_events + expect(output).to contain_exactly( + eq(custom_event(e, filtered_user)) + ) + end + + it "does a final flush when shutting down" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "identify", key: user[:key], user: user } + @ep.add_event(e) + + @ep.stop + + output = get_events_from_last_request + expect(output).to contain_exactly(e) + end + + it "sends nothing if there are no events" do + @ep = subject.new("sdk_key", default_config, hc) + @ep.flush + expect(hc.get_request).to be nil + end + + it "sends SDK key" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "identify", user: user } + @ep.add_event(e) + + @ep.flush + @ep.wait_until_inactive + + expect(hc.get_request.headers["Authorization"]).to eq "sdk_key" + end + + it "stops posting events after getting a 401 error" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "identify", user: user } + @ep.add_event(e) + + hc.set_response_status(401) + @ep.flush + @ep.wait_until_inactive + expect(hc.get_request).not_to be_nil + hc.reset + + @ep.add_event(e) + @ep.flush + @ep.wait_until_inactive + expect(hc.get_request).to be_nil + end + + it "retries flush once after 5xx error" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "identify", user: user } + @ep.add_event(e) + + hc.set_response_status(503) + @ep.flush + @ep.wait_until_inactive + + expect(hc.get_request).not_to be_nil + expect(hc.get_request).not_to be_nil + expect(hc.get_request).to be_nil # no 3rd request + end + + it "retries flush once after connection error" do + @ep = subject.new("sdk_key", default_config, hc) + e = { kind: "identify", user: user } + @ep.add_event(e) + + hc.set_exception(Faraday::Error::ConnectionFailed.new("fail")) + @ep.flush + @ep.wait_until_inactive + + expect(hc.get_request).not_to be_nil + expect(hc.get_request).not_to be_nil + expect(hc.get_request).to be_nil # no 3rd request + end + + def index_event(e, user) + { + kind: "index", + creationDate: e[:creationDate], + user: user + } + end + + def feature_event(e, flag, debug, inline_user) + out = { + kind: debug ? "debug" : "feature", + creationDate: e[:creationDate], + key: flag[:key], + variation: e[:variation], + version: flag[:version], + value: e[:value] + } + if inline_user.nil? + out[:userKey] = e[:user][:key] + else + out[:user] = inline_user + end + out + end + + def custom_event(e, inline_user) + out = { + kind: "custom", + creationDate: e[:creationDate], + key: e[:key] + } + out[:data] = e[:data] if e.has_key?(:data) + if inline_user.nil? + out[:userKey] = e[:user][:key] + else + out[:user] = inline_user + end + out + end + + def flush_and_get_events + @ep.flush + @ep.wait_until_inactive + get_events_from_last_request + end + + def get_events_from_last_request + req = hc.get_request + JSON.parse(req.body, symbolize_names: true) + end + + class FakeHttpClient + def initialize + reset + end + + def set_response_status(status) + @status = status + end + + def set_server_time(time_millis) + @server_time = Time.at(time_millis.to_f / 1000) + end + + def set_exception(e) + @exception = e + end + + def reset + @requests = [] + @status = 200 + end + + def post(uri) + req = Faraday::Request.create("POST") + req.headers = {} + req.options = Faraday::RequestOptions.new + yield req + @requests.push(req) + if @exception + raise @exception + else + resp = Faraday::Response.new + headers = {} + if @server_time + headers["Date"] = @server_time.httpdate + end + resp.finish({ + status: @status ? @status : 200, + response_headers: headers + }) + resp + end + end + + def get_request + @requests.shift + end + end + + class FakeResponse + def initialize(status) + @status = status + end + + attr_reader :status + end +end diff --git a/spec/expiring_cache_spec.rb b/spec/expiring_cache_spec.rb new file mode 100644 index 00000000..ed021c34 --- /dev/null +++ b/spec/expiring_cache_spec.rb @@ -0,0 +1,76 @@ +require 'timecop' + +describe LaunchDarkly::ExpiringCache do + subject { LaunchDarkly::ExpiringCache } + + before(:each) do + Timecop.freeze(Time.now) + end + + after(:each) do + Timecop.return + end + + it "evicts entries based on TTL" do + c = subject.new(3, 300) + c[:a] = 1 + c[:b] = 2 + + Timecop.freeze(Time.now + 330) + + c[:c] = 3 + + expect(c[:a]).to be nil + expect(c[:b]).to be nil + expect(c[:c]).to eq 3 + end + + it "evicts entries based on max size" do + c = subject.new(2, 300) + c[:a] = 1 + c[:b] = 2 + c[:c] = 3 + + expect(c[:a]).to be nil + expect(c[:b]).to eq 2 + expect(c[:c]).to eq 3 + end + + it "does not reset LRU on get" do + c = subject.new(2, 300) + c[:a] = 1 + c[:b] = 2 + c[:a] + c[:c] = 3 + + expect(c[:a]).to be nil + expect(c[:b]).to eq 2 + expect(c[:c]).to eq 3 + end + + it "resets LRU on put" do + c = subject.new(2, 300) + c[:a] = 1 + c[:b] = 2 + c[:a] = 1 + c[:c] = 3 + + expect(c[:a]).to eq 1 + expect(c[:b]).to be nil + expect(c[:c]).to eq 3 + end + + it "resets TTL on put" do + c = subject.new(3, 300) + c[:a] = 1 + c[:b] = 2 + + Timecop.freeze(Time.now + 330) + c[:a] = 1 + c[:c] = 3 + + expect(c[:a]).to eq 1 + expect(c[:b]).to be nil + expect(c[:c]).to eq 3 + end +end diff --git a/spec/fixtures/feature.json b/spec/fixtures/feature.json index 152565f7..dd44b404 100644 --- a/spec/fixtures/feature.json +++ b/spec/fixtures/feature.json @@ -32,5 +32,6 @@ true, false ], + "trackEvents": true, "deleted":false } \ No newline at end of file diff --git a/spec/ldclient_spec.rb b/spec/ldclient_spec.rb index bf6c69a0..405e0d53 100644 --- a/spec/ldclient_spec.rb +++ b/spec/ldclient_spec.rb @@ -3,7 +3,12 @@ describe LaunchDarkly::LDClient do subject { LaunchDarkly::LDClient } - let(:config) { LaunchDarkly::Config.new({offline: true}) } + let(:offline_config) { LaunchDarkly::Config.new({offline: true}) } + let(:offline_client) do + subject.new("secret", offline_config) + end + let(:update_processor) { NullUpdateProcessor.new } + let(:config) { LaunchDarkly::Config.new({send_events: false, update_processor: update_processor}) } let(:client) do subject.new("secret", config) end @@ -24,11 +29,74 @@ JSON.parse(data, symbolize_names: true) end + def event_processor + client.instance_variable_get(:@event_processor) + end + describe '#variation' do it "will return the default value if the client is offline" do - result = client.variation(feature[:key], user, "default") + result = offline_client.variation("doesntmatter", user, "default") expect(result).to eq "default" end + + it "queues a feature request event for an unknown feature" do + expect(event_processor).to receive(:add_event).with(hash_including( + kind: "feature", key: "badkey", user: user, value: "default", default: "default" + )) + client.variation("badkey", user, "default") + end + + it "queues a feature request event for an existing feature" do + config.feature_store.init({ LaunchDarkly::FEATURES => {} }) + config.feature_store.upsert(LaunchDarkly::FEATURES, feature) + expect(event_processor).to receive(:add_event).with(hash_including( + kind: "feature", + key: feature[:key], + version: feature[:version], + user: user, + variation: 0, + value: true, + default: "default", + trackEvents: true, + debugEventsUntilDate: nil + )) + client.variation(feature[:key], user, "default") + end + + it "queues a feature event for an existing feature when user is nil" do + config.feature_store.init({ LaunchDarkly::FEATURES => {} }) + config.feature_store.upsert(LaunchDarkly::FEATURES, feature) + expect(event_processor).to receive(:add_event).with(hash_including( + kind: "feature", + key: feature[:key], + version: feature[:version], + user: nil, + variation: nil, + value: "default", + default: "default", + trackEvents: true, + debugEventsUntilDate: nil + )) + client.variation(feature[:key], nil, "default") + end + + it "queues a feature event for an existing feature when user key is nil" do + config.feature_store.init({ LaunchDarkly::FEATURES => {} }) + config.feature_store.upsert(LaunchDarkly::FEATURES, feature) + bad_user = { name: "Bob" } + expect(event_processor).to receive(:add_event).with(hash_including( + kind: "feature", + key: feature[:key], + version: feature[:version], + user: bad_user, + variation: nil, + value: "default", + default: "default", + trackEvents: true, + debugEventsUntilDate: nil + )) + client.variation(feature[:key], bad_user, "default") + end end describe '#secure_mode_hash' do @@ -40,22 +108,24 @@ describe '#track' do it "queues up an custom event" do - expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(kind: "custom", key: "custom_event_name", user: user, data: 42)) + expect(event_processor).to receive(:add_event).with(hash_including(kind: "custom", key: "custom_event_name", user: user, data: 42)) client.track("custom_event_name", user, 42) end + it "sanitizes the user in the event" do - expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) + expect(event_processor).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) client.track("custom_event_name", numeric_key_user, nil) end end describe '#identify' do it "queues up an identify event" do - expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(kind: "identify", key: user[:key], user: user)) + expect(event_processor).to receive(:add_event).with(hash_including(kind: "identify", key: user[:key], user: user)) client.identify(user) end + it "sanitizes the user in the event" do - expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) + expect(event_processor).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) client.identify(numeric_key_user) end end @@ -72,24 +142,31 @@ end describe 'with send_events: false' do - let(:config) { LaunchDarkly::Config.new({offline: true, send_events: false}) } + let(:config) { LaunchDarkly::Config.new({offline: true, send_events: false, update_processor: update_processor}) } let(:client) { subject.new("secret", config) } - let(:queue) { client.instance_variable_get(:@event_processor).instance_variable_get(:@queue) } + it "uses a NullEventProcessor" do + ep = client.instance_variable_get(:@event_processor) + expect(ep).to be_a(LaunchDarkly::NullEventProcessor) + end + end + + describe 'with send_events: true' do + let(:config_with_events) { LaunchDarkly::Config.new({offline: false, send_events: true, update_processor: update_processor}) } + let(:client_with_events) { subject.new("secret", config_with_events) } - it "does not enqueue a feature event" do - client.variation(feature[:key], user, "default") - expect(queue.empty?).to be true + it "does not use a NullEventProcessor" do + ep = client_with_events.instance_variable_get(:@event_processor) + expect(ep).not_to be_a(LaunchDarkly::NullEventProcessor) end + end - it "does not enqueue a custom event" do - client.track("custom_event_name", user, 42) - expect(queue.empty?).to be true + class NullUpdateProcessor + def start end - it "does not enqueue an identify event" do - client.identify(user) - expect(queue.empty?).to be true + def initialized? + true end end end \ No newline at end of file diff --git a/spec/simple_lru_cache_spec.rb b/spec/simple_lru_cache_spec.rb new file mode 100644 index 00000000..fabf8738 --- /dev/null +++ b/spec/simple_lru_cache_spec.rb @@ -0,0 +1,24 @@ +require "spec_helper" + +describe LaunchDarkly::SimpleLRUCacheSet do + subject { LaunchDarkly::SimpleLRUCacheSet } + + it "retains values up to capacity" do + lru = subject.new(3) + expect(lru.add("a")).to be false + expect(lru.add("b")).to be false + expect(lru.add("c")).to be false + expect(lru.add("a")).to be true + expect(lru.add("b")).to be true + expect(lru.add("c")).to be true + end + + it "discards oldest value on overflow" do + lru = subject.new(2) + expect(lru.add("a")).to be false + expect(lru.add("b")).to be false + expect(lru.add("a")).to be true + expect(lru.add("c")).to be false # b is discarded as oldest + expect(lru.add("b")).to be false + end +end \ No newline at end of file diff --git a/spec/event_serializer_spec.rb b/spec/user_filter_spec.rb similarity index 50% rename from spec/event_serializer_spec.rb rename to spec/user_filter_spec.rb index 14755045..96814289 100644 --- a/spec/event_serializer_spec.rb +++ b/spec/user_filter_spec.rb @@ -1,7 +1,7 @@ require "spec_helper" -describe LaunchDarkly::EventSerializer do - subject { LaunchDarkly::EventSerializer } +describe LaunchDarkly::UserFilter do + subject { LaunchDarkly::UserFilter } let(:base_config) { LaunchDarkly::Config.new } let(:config_with_all_attrs_private) { LaunchDarkly::Config.new({ all_attributes_private: true })} @@ -45,68 +45,47 @@ { key: 'abc', anonymous: 'true', custom: { }, privateAttrs: [ 'bizzle', 'dizzle' ]} } - - def make_event(user) - { - creationDate: 1000000, - key: 'xyz', - kind: 'thing', - user: user - } - end - - def parse_results(js) - JSON.parse(js, symbolize_names: true) - end - describe "serialize_events" do it "includes all user attributes by default" do - es = LaunchDarkly::EventSerializer.new(base_config) - event = make_event(user) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [event] + uf = LaunchDarkly::UserFilter.new(base_config) + result = uf.transform_user_props(user) + expect(result).to eq user end it "hides all except key if all_attributes_private is true" do - es = LaunchDarkly::EventSerializer.new(config_with_all_attrs_private) - event = make_event(user) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [make_event(user_with_all_attrs_hidden)] + uf = LaunchDarkly::UserFilter.new(config_with_all_attrs_private) + result = uf.transform_user_props(user) + expect(result).to eq user_with_all_attrs_hidden end it "hides some attributes if private_attribute_names is set" do - es = LaunchDarkly::EventSerializer.new(config_with_some_attrs_private) - event = make_event(user) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [make_event(user_with_some_attrs_hidden)] + uf = LaunchDarkly::UserFilter.new(config_with_some_attrs_private) + result = uf.transform_user_props(user) + expect(result).to eq user_with_some_attrs_hidden end it "hides attributes specified in per-user privateAttrs" do - es = LaunchDarkly::EventSerializer.new(base_config) - event = make_event(user_specifying_own_private_attr) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [make_event(user_with_own_specified_attr_hidden)] + uf = LaunchDarkly::UserFilter.new(base_config) + result = uf.transform_user_props(user_specifying_own_private_attr) + expect(result).to eq user_with_own_specified_attr_hidden end it "looks at both per-user privateAttrs and global config" do - es = LaunchDarkly::EventSerializer.new(config_with_some_attrs_private) - event = make_event(user_specifying_own_private_attr) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [make_event(user_with_all_attrs_hidden)] + uf = LaunchDarkly::UserFilter.new(config_with_some_attrs_private) + result = uf.transform_user_props(user_specifying_own_private_attr) + expect(result).to eq user_with_all_attrs_hidden end it "strips out any unknown top-level attributes" do - es = LaunchDarkly::EventSerializer.new(base_config) - event = make_event(user_with_unknown_top_level_attrs) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [make_event(user)] + uf = LaunchDarkly::UserFilter.new(base_config) + result = uf.transform_user_props(user_with_unknown_top_level_attrs) + expect(result).to eq user end it "leaves the anonymous attribute as is" do - es = LaunchDarkly::EventSerializer.new(config_with_all_attrs_private) - event = make_event(anon_user) - j = es.serialize_events([event]) - expect(parse_results(j)).to eq [make_event(anon_user_with_all_attrs_hidden)] + uf = LaunchDarkly::UserFilter.new(config_with_all_attrs_private) + result = uf.transform_user_props(anon_user) + expect(result).to eq anon_user_with_all_attrs_hidden end end end