From 92612fde34cf581b2c4b217700e2736bccece410 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 18 Dec 2016 15:56:51 -0600 Subject: [PATCH] Start on a better HTTP queue strategy --- lib/timber/log_devices/http.rb | 208 ++++++++++++------ .../log_devices/http/triggered_buffer.rb | 87 -------- 2 files changed, 143 insertions(+), 152 deletions(-) delete mode 100644 lib/timber/log_devices/http/triggered_buffer.rb diff --git a/lib/timber/log_devices/http.rb b/lib/timber/log_devices/http.rb index 3d008441..73350808 100644 --- a/lib/timber/log_devices/http.rb +++ b/lib/timber/log_devices/http.rb @@ -1,5 +1,3 @@ -require "timber/log_devices/http/triggered_buffer" - module Timber module LogDevices # A log device that buffers and delivers log messages over HTTPS to the Timber API in batches. @@ -8,20 +6,68 @@ module LogDevices # # See {#initialize} for options and more details. class HTTP - API_URI = URI.parse(ENV["TIMBER_INGESTION_URL"] || "https://logs.timber.io/frames") - CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze - CONNECTION_HEADER = "keep-alive".freeze - USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze - HTTPS = Net::HTTP.new(API_URI.host, API_URI.port).tap do |https| - https.use_ssl = true - https.read_timeout = 30 - https.ssl_timeout = 10 - # Ruby 1.9.X doesn't have this setting. - if https.respond_to?(:keep_alive_timeout=) - https.keep_alive_timeout = 60 + # @private + class LogMsgQueue + MAX_MSG_BYTES = 50_000 # 50kb + + def initialize(max_bytes) + @lock = Mutex.new + @max_bytes = max_bytes + @array = [] + @bytesize = 0 + end + + def enqueue(msg) + if msg.bytesize > MAX_MSG_BYTES + raise ArgumentError.new("Log message exceeds the #{MAX_MSG_BYTES} bytes limit") + end + + @lock.synchronize do + @array << msg + @byteszie += msg.bytesize + end + end + + def flush + @lock.synchronize do + old = @array + @array = [] + @bytesize = 0 + return old + end + end + + def full? + @lock.synchronize do + @bytesize >= @max_bytes + end end - https.open_timeout = 10 end + + # Works like SizedQueue, but drops message instead of blocking. Pass one of these in + # to {HTTP#intiialize} via the :request_queue option if you'd prefer to drop messages + # in the event of a buffer overflow instead of applying back pressure. + class DroppingSizedQueue < SizedQueue + # Returns true/false depending on whether the queue is full or not + def push(obj) + @mutex.synchronize do + return false unless @que.length < @max + + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + return true + end + end + end + + TIMBER_URL = "https://logs.timber.io/frames".freeze + CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze + USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze DELIVERY_FREQUENCY_SECONDS = 2.freeze RETRY_LIMIT = 5.freeze BACKOFF_RATE_SECONDS = 3.freeze @@ -56,75 +102,107 @@ class HTTP # buffer_overflow_handler: overflow_handler) # Timber::Logger.new(http_log_device) def initialize(api_key, options = {}) + @debug = options[:debug] || ENV['debug'] @api_key = api_key - @buffer = TriggeredBuffer.new( - payload_limit_bytes: options[:payload_limit_bytes], - limit_bytes: options[:buffer_limit_bytes], - overflow_handler: options[:buffer_overflow_handler] - ) - @delivery_interval_thread = Thread.new do - loop do - sleep(options[:delivery_frequency_seconds] || DELIVERY_FREQUENCY_SECONDS) - - @last_messages_overflow_count = 0 - messages_overflown_count = @buffer.messages_overflown_count - if messages_overflown_count > @last_messages_overflow_count - difference = messages_overflown_count - @last_messages_overflow_count - @last_messages_overflow_count = messages_overflown_count - logger.warn("Timber HTTP buffer has overflown #{difference} times") - end + @timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL) + @batch_byte_size = opts[:batch_byte_size] || 3_000_000 # 3mb + @flush_interval = opts[:flush_interval] || 2 # 2 seconds + @requests_per_conn = opts[:requests_per_conn] || 1_000 + @msg_queue = LogMsgQueue.new(@batch_byte_size) + @request_queue = opts[:request_queue] || SizedQueue.new(3) - buffer_for_delivery = @buffer.reserve - if buffer_for_delivery - deliver(buffer_for_delivery) - end - end - end + @outlet_thread = Thread.new { outlet } + @flush_thread = Thread.new { intervaled_flush } end # Write a new log line message to the buffer, and deliver if the msg exceeds the # payload limit. def write(msg) - buffer_for_delivery = @buffer.write(msg) - if buffer_for_delivery - deliver(buffer_for_delivery) + @msg_queue.enqueue(msg) + if @msg_queue.full? + flush end true end # Closes the log device, cleans up, and attempts one last delivery. def close - @delivery_interval_thread.kill - buffer_for_delivery = @buffer.reserve - if buffer_for_delivery - deliver(buffer_for_delivery) - end + @outlet_thread.kill + @flush_thread.kill + flush end private - def deliver(body) - Thread.new do - RETRY_LIMIT.times do |try_index| - request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req| - req['Authorization'] = authorization_payload - req['Connection'] = CONNECTION_HEADER - req['Content-Type'] = CONTENT_TYPE - req['User-Agent'] = USER_AGENT - req.body = body - end + def debug? + !@debug.nil? + end + + def flush + msgs = @msg_queue.flush + return if msgs.empty? + + body = "" + msgs.each do |msg| + body << msg + end + + req = Net::HTTP::Post.new(@logplex_url.path) + req['Authorization'] = authorization_payload + req['Content-Type'] = CONTENT_TYPE + req['User-Agent'] = USER_AGENT + req.body = body + @request_queue.enq(req) + @last_flush = Time.now + end + + def intervaled_flush + loop do + begin + flush if interval_ready? + sleep(0.1) + rescue Exception => e + logger.error("Timber intervaled flush failed: #{e.inspect}") + end + end + end + + def interval_flush_ready? + @last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval + end + + def outlet + loop do + http = Net::HTTP.new(API_URI.host, API_URI.port) + http.set_debug_output(logger) if debug? + http.use_ssl = true if @timber_url.scheme == 'https' + http.read_timeout = 30 + http.ssl_timeout = 10 + http.open_timeout = 10 - res = HTTPS.request(request) - code = res.code.to_i - if code < 200 || code >= 300 - try = try_index + 1 - logger.debug("Timber HTTP delivery failed, try #{try} - #{res.code}: #{res.body}") - sleep(try * BACKOFF_RATE_SECONDS) - else - @buffer.remove(body) - logger.debug("Timber HTTP delivery successful - #{code}") - logger.debug("Timber new buffer size - #{@buffer.total_bytesize}") - break # exit the loop + begin + http.start do |conn| + num_reqs = 0 + while num_reqs < @max_reqs_per_conn + #Blocks waiting for a request. + req = @request_queue.deq + @req_in_flight += 1 + resp = nil + begin + resp = conn.request(req) + rescue => e + logger.error("Timber request error: #{e.message}") if debug? + next + ensure + @req_in_flight -= 1 + end + num_reqs += 1 + logger.info("Time request successful: #{resp.code}") if debug? + end end + rescue => e + logger.error("Timber request error: #{e.message}") if debug? + ensure + http.finish if http.started? end end end diff --git a/lib/timber/log_devices/http/triggered_buffer.rb b/lib/timber/log_devices/http/triggered_buffer.rb deleted file mode 100644 index 7ea39547..00000000 --- a/lib/timber/log_devices/http/triggered_buffer.rb +++ /dev/null @@ -1,87 +0,0 @@ -require "monitor" - -module Timber - module LogDevices - class HTTP - # Maintains a triggered buffer, where the trigger is {PAYLOAD_LIMIT_BYTES}. Once the buffer - # exceeds this limit it will lock and return that buffer up to that point while still making - # a new buffer available for writes. This ensures that the HTTP client can attempt to deliver - # the buffer contents without blocking execution of the application. - # - # If the overall buffer exceeeds the overall limit (specified by the `:limit_bytes` option), - # then a buffer overflow is triggered. This can be customized using the `:overflow_handler` - # option. - class TriggeredBuffer - DEFAULT_PAYLOAD_LIMIT_BYTES = 5_000_000 # 5mb, the Timber API will not accept messages larger than this - DEFAULT_LIMIT_BYTES = 50_000_000 # 50mb - - attr_reader :messages_overflown_count - - def initialize(options = {}) - @buffers = [] - @monitor = Monitor.new - @payload_limit_bytes = options[:payload_limit_bytes] || DEFAULT_PAYLOAD_LIMIT_BYTES - @limit_bytes = options[:limit_bytes] || DEFAULT_LIMIT_BYTES - @overflow_handler = options[:overflow_handler] - @messages_overflown_count = 0 - end - - def write(msg) - if msg.bytesize > @payload_limit_bytes || (msg.bytesize + total_bytesize) > @limit_bytes - handle_overflow(msg) - return nil - end - - @monitor.synchronize do - buffer = writable_buffer - if @buffers == [] || buffer.nil? || buffer.frozen? - @buffers << msg - nil - elsif (buffer.bytesize + msg.bytesize) > @payload_limit_bytes - @buffers << msg - buffer.freeze - else - buffer << msg - nil - end - end - end - - def reserve - @monitor.synchronize do - buffer = writable_buffer - if buffer - buffer.freeze - end - end - end - - def remove(buffer) - @monitor.synchronize do - @buffers.delete(buffer) - end - end - - def total_bytesize - @buffers.reduce(0) { |acc, buffer| acc + buffer.bytesize } - end - - private - def writable_buffer - @buffers.find { |buffer| !buffer.frozen? } - end - - def handle_overflow(msg) - @messages_overflown_count += 1 - if @overflow_handler - @overflow_handler.call(msg) - end - end - - def logger - Config.instance.logger - end - end - end - end -end \ No newline at end of file