Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Start on a better HTTP queue strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
binarylogic committed Dec 18, 2016
1 parent 7d0b732 commit 92612fd
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 152 deletions.
208 changes: 143 additions & 65 deletions lib/timber/log_devices/http.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
require "timber/log_devices/http/triggered_buffer"

module Timber
module LogDevices
# A log device that buffers and delivers log messages over HTTPS to the Timber API in batches.
Expand All @@ -8,20 +6,68 @@ module LogDevices
#
# See {#initialize} for options and more details.
class HTTP
API_URI = URI.parse(ENV["TIMBER_INGESTION_URL"] || "https://logs.timber.io/frames")
CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze
CONNECTION_HEADER = "keep-alive".freeze
USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze
HTTPS = Net::HTTP.new(API_URI.host, API_URI.port).tap do |https|
https.use_ssl = true
https.read_timeout = 30
https.ssl_timeout = 10
# Ruby 1.9.X doesn't have this setting.
if https.respond_to?(:keep_alive_timeout=)
https.keep_alive_timeout = 60
# @private
class LogMsgQueue
MAX_MSG_BYTES = 50_000 # 50kb

def initialize(max_bytes)
@lock = Mutex.new
@max_bytes = max_bytes
@array = []
@bytesize = 0
end

def enqueue(msg)
if msg.bytesize > MAX_MSG_BYTES
raise ArgumentError.new("Log message exceeds the #{MAX_MSG_BYTES} bytes limit")
end

@lock.synchronize do
@array << msg
@byteszie += msg.bytesize
end
end

def flush
@lock.synchronize do
old = @array
@array = []
@bytesize = 0
return old
end
end

def full?
@lock.synchronize do
@bytesize >= @max_bytes
end
end
https.open_timeout = 10
end

# Works like SizedQueue, but drops message instead of blocking. Pass one of these in
# to {HTTP#intiialize} via the :request_queue option if you'd prefer to drop messages
# in the event of a buffer overflow instead of applying back pressure.
class DroppingSizedQueue < SizedQueue
# Returns true/false depending on whether the queue is full or not
def push(obj)
@mutex.synchronize do
return false unless @que.length < @max

@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
return true
end
end
end

TIMBER_URL = "https://logs.timber.io/frames".freeze
CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze
USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze
DELIVERY_FREQUENCY_SECONDS = 2.freeze
RETRY_LIMIT = 5.freeze
BACKOFF_RATE_SECONDS = 3.freeze
Expand Down Expand Up @@ -56,75 +102,107 @@ class HTTP
# buffer_overflow_handler: overflow_handler)
# Timber::Logger.new(http_log_device)
def initialize(api_key, options = {})
@debug = options[:debug] || ENV['debug']
@api_key = api_key
@buffer = TriggeredBuffer.new(
payload_limit_bytes: options[:payload_limit_bytes],
limit_bytes: options[:buffer_limit_bytes],
overflow_handler: options[:buffer_overflow_handler]
)
@delivery_interval_thread = Thread.new do
loop do
sleep(options[:delivery_frequency_seconds] || DELIVERY_FREQUENCY_SECONDS)

@last_messages_overflow_count = 0
messages_overflown_count = @buffer.messages_overflown_count
if messages_overflown_count > @last_messages_overflow_count
difference = messages_overflown_count - @last_messages_overflow_count
@last_messages_overflow_count = messages_overflown_count
logger.warn("Timber HTTP buffer has overflown #{difference} times")
end
@timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL)
@batch_byte_size = opts[:batch_byte_size] || 3_000_000 # 3mb
@flush_interval = opts[:flush_interval] || 2 # 2 seconds
@requests_per_conn = opts[:requests_per_conn] || 1_000
@msg_queue = LogMsgQueue.new(@batch_byte_size)
@request_queue = opts[:request_queue] || SizedQueue.new(3)

buffer_for_delivery = @buffer.reserve
if buffer_for_delivery
deliver(buffer_for_delivery)
end
end
end
@outlet_thread = Thread.new { outlet }
@flush_thread = Thread.new { intervaled_flush }
end

# Write a new log line message to the buffer, and deliver if the msg exceeds the
# payload limit.
def write(msg)
buffer_for_delivery = @buffer.write(msg)
if buffer_for_delivery
deliver(buffer_for_delivery)
@msg_queue.enqueue(msg)
if @msg_queue.full?
flush
end
true
end

# Closes the log device, cleans up, and attempts one last delivery.
def close
@delivery_interval_thread.kill
buffer_for_delivery = @buffer.reserve
if buffer_for_delivery
deliver(buffer_for_delivery)
end
@outlet_thread.kill
@flush_thread.kill
flush
end

private
def deliver(body)
Thread.new do
RETRY_LIMIT.times do |try_index|
request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req|
req['Authorization'] = authorization_payload
req['Connection'] = CONNECTION_HEADER
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = body
end
def debug?
!@debug.nil?
end

def flush
msgs = @msg_queue.flush
return if msgs.empty?

body = ""
msgs.each do |msg|
body << msg
end

req = Net::HTTP::Post.new(@logplex_url.path)
req['Authorization'] = authorization_payload
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = body
@request_queue.enq(req)
@last_flush = Time.now
end

def intervaled_flush
loop do
begin
flush if interval_ready?
sleep(0.1)
rescue Exception => e
logger.error("Timber intervaled flush failed: #{e.inspect}")
end
end
end

def interval_flush_ready?
@last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval
end

def outlet
loop do
http = Net::HTTP.new(API_URI.host, API_URI.port)
http.set_debug_output(logger) if debug?
http.use_ssl = true if @timber_url.scheme == 'https'
http.read_timeout = 30
http.ssl_timeout = 10
http.open_timeout = 10

res = HTTPS.request(request)
code = res.code.to_i
if code < 200 || code >= 300
try = try_index + 1
logger.debug("Timber HTTP delivery failed, try #{try} - #{res.code}: #{res.body}")
sleep(try * BACKOFF_RATE_SECONDS)
else
@buffer.remove(body)
logger.debug("Timber HTTP delivery successful - #{code}")
logger.debug("Timber new buffer size - #{@buffer.total_bytesize}")
break # exit the loop
begin
http.start do |conn|
num_reqs = 0
while num_reqs < @max_reqs_per_conn
#Blocks waiting for a request.
req = @request_queue.deq
@req_in_flight += 1
resp = nil
begin
resp = conn.request(req)
rescue => e
logger.error("Timber request error: #{e.message}") if debug?
next
ensure
@req_in_flight -= 1
end
num_reqs += 1
logger.info("Time request successful: #{resp.code}") if debug?
end
end
rescue => e
logger.error("Timber request error: #{e.message}") if debug?
ensure
http.finish if http.started?
end
end
end
Expand Down
87 changes: 0 additions & 87 deletions lib/timber/log_devices/http/triggered_buffer.rb

This file was deleted.

0 comments on commit 92612fd

Please sign in to comment.