Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Update HTTP to implement a double trigger buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
binarylogic committed Dec 15, 2016
1 parent 857a7d3 commit 9c44cee
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 106 deletions.
1 change: 1 addition & 0 deletions lib/timber.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# core classes
require "json" # brings to_json to the core classes
require "msgpack" # brings to_msgpack to the core classes

# Base (must come first, order matters)
require "timber/config"
Expand Down
154 changes: 119 additions & 35 deletions lib/timber/log_devices/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,88 @@

module Timber
module LogDevices
# A log device that buffers and sends logs to the Timber API over HTTP in intervals. The buffer
# uses MessagePack::Buffer, which is fast, efficient with memory, and reduces
# the payload size sent to Timber.
# A log device that buffers and delivers log messages to the Timber API in batches.
# The payload is an array of msgpack formatter message delimited by new lines. Msgpack
# is an efficient way to represent JSON objects that save on space.
#
# Delivery has 2 triggers: a payload limit and a frequency, both defined by
# {PAYLOAD_LIMIT_BYTES} and {DELIVERY_FREQUENCY_SECONDS} respectively. If either are
# surpassed, a delivery will be attempted.
#
# In the event that the HTTP requests cannot empty the buffer fast enough, a buffer overflow
# will be triggered. This can be handled with the `:buffer_overflow_handler` option upon
# instantiation, allowing you to write the data to disk, etc. See {#new} for more details.
class HTTP
class DeliveryError < StandardError; end
# Maintains a triggered buffer, where the trigger is {PAYLOAD_LIMIT_BYTES}. Once the buffer
# exceeds this limit it will lock and return that buffer up to that point while still making
# a new buffer available for writes. This ensures that the HTTP client can attempt to deliver
# the buffer contents without blocking execution of the application.
#
# If the overall buffer exceeeds the overall limit (specified by the `:limit_bytes` option),
# then a buffer overflow is triggered. This can be customized using the `:overflow_handler`
# option.
class TriggeredBuffer
DEFAULT_PAYLOAD_LIMIT_BYTES = 5_000_000 # 5mb, the Timber API will not accept messages larger than this
DEFAULT_LIMIT_BYTES = 50_000_000 # 50mb

def initialize(options = {})
@buffers = []
@monitor = Monitor.new
@payload_limit_bytes = options[:payload_limit_bytes] || DEFAULT_PAYLOAD_LIMIT_BYTES
@limit_bytes = options[:limit_bytes] || DEFAULT_LIMIT_BYTES
@overflow_handler = options[:overflow_handler]
end

def write(msg)
if msg.bytesize > @payload_limit_bytes || (msg.bytesize + total_bytesize) > @limit_bytes
handle_overflow(msg)
return nil
end

@monitor.synchronize do
buffer = writable_buffer
if @buffers == [] || buffer.frozen?
@buffers << msg
nil
elsif (buffer.bytesize + msg.bytesize) > @payload_limit_bytes
@buffers << msg
buffer.freeze
else
buffer << msg
nil
end
end
end

def reserve
@monitor.synchronize do
buffer = writable_buffer
if buffer
buffer.freeze
end
end
end

private
def total_bytesize
@buffers.reduce(0) { |acc, buffer| acc + buffer.bytesize }
end

def writable_buffer
@buffers.find { |buffer| !buffer.frozen? }
end

def handle_overflow(msg)
if @overflow_handler
@overflow_handler.call(msg)
end
end
end

API_URI = URI.parse("https://api.timber.io/http_frames")
CONTENT_TYPE = "application/json".freeze
CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze
CONNECTION_HEADER = "keep-alive".freeze
USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze

HTTPS = Net::HTTP.new(API_URI.host, API_URI.port).tap do |https|
https.use_ssl = true
https.read_timeout = 30
Expand All @@ -24,7 +95,10 @@ class DeliveryError < StandardError; end
https.open_timeout = 10
end

DEFAULT_DELIVERY_FREQUENCY = 2.freeze
PAYLOAD_LIMIT_BYTES = 5_000_000 # 5mb
BUFFER_LIMIT_BYTES = 50_000_000 # 50mb
DELIVERY_FREQUENCY_SECONDS = 2.freeze


# Instantiates a new HTTP log device.
#
Expand All @@ -35,48 +109,58 @@ class DeliveryError < StandardError; end
# attempt to deliver logs to the Timber API. The HTTP client buffers logs between calls.
def initialize(api_key, options = {})
@api_key = api_key
@buffer = []
@monitor = Monitor.new
@delivery_thread = Thread.new do
at_exit { deliver }
@buffer = TriggeredBuffer.new(
payload_limit_bytes: options[:payload_limit_bytes],
limit_bytes: options[:buffer_limit_bytes],
overflow_handler: options[:buffer_overflow_handler]
)
@delivery_interval_thread = Thread.new do
loop do
sleep options[:frequency_seconds] || DEFAULT_DELIVERY_FREQUENCY
deliver
sleep options[:frequency_seconds] || DELIVERY_FREQUENCY_SECONDS
buffer_for_delivery = @buffer.reserve
if buffer_for_delivery
deliver(buffer_for_delivery)
end
end
end
end

def write(msg)
@monitor.synchronize {
@buffer << msg
}
buffer_for_delivery = @buffer.write(msg)
if buffer_for_delivery
deliver(buffer_for_delivery)
end
true
end

def close
@delivery_thread.kill
@delivery_interval_thread.kill
buffer_for_delivery = @buffer.reserve
if buffer_for_delivery
deliver(buffer_for_delivery)
end
end

private
def deliver
body = @buffer.read

request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req|
req['Authorization'] = authorization_payload
req['Connection'] = CONNECTION_HEADER
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = body
end

HTTPS.request(request).tap do |res|
code = res.code.to_i
if code < 200 || code >= 300
raise DeliveryError.new("Bad response from Timber API - #{res.code}: #{res.body}")
def deliver(body)
Thread.new do
request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req|
req['Authorization'] = authorization_payload
req['Connection'] = CONNECTION_HEADER
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = body
end
Config.instance.logger.debug("Success! #{code}: #{res.body}")
end

@buffer.clear
HTTPS.request(request)
# HTTPS.request(request).tap do |res|
# code = res.code.to_i
# if code < 200 || code >= 300
# raise DeliveryError.new("Bad response from Timber API - #{res.code}: #{res.body}")
# end
# Config.instance.logger.debug("Success! #{code}: #{res.body}")
# end
end
end

def authorization_payload
Expand Down
34 changes: 25 additions & 9 deletions lib/timber/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ def build_log_entry(severity, time, progname, msg)
end
end

# Structures your log messages into Timber's hybrid format, which makes
# it easy to read while also appending the appropriate metadata.
#
# logger = Timber::Logger.new(STDOUT)
# logger.formatter = Timber::JSONFormatter.new
#
# Example message:
#
# My log message @timber.io {"level":"info","dt":"2016-09-01T07:00:00.000000-05:00"}
#
class HybridFormatter < Formatter
METADATA_CALLOUT = "@timber.io".freeze

def call(severity, time, progname, msg)
log_entry = build_log_entry(severity, time, progname, msg)
metadata = log_entry.to_json(:except => [:message])
# use << for concatenation for performance reasons
log_entry.message.gsub("\n", "\\n") << " " << METADATA_CALLOUT << " " << metadata << "\n"
end
end

# Structures your log messages into JSON.
#
# logger = Timber::Logger.new(STDOUT)
Expand All @@ -97,24 +118,19 @@ def call(severity, time, progname, msg)
end
end

# Structures your log messages into Timber's hybrid format, which makes
# it easy to read while also appending the appropriate metadata.
# Structures your log messages into JSON.
#
# logger = Timber::Logger.new(STDOUT)
# logger.formatter = Timber::JSONFormatter.new
#
# Example message:
#
# My log message @timber.io {"level":"info","dt":"2016-09-01T07:00:00.000000-05:00"}
# {"level":"info","dt":"2016-09-01T07:00:00.000000-05:00","message":"My log message"}
#
class HybridFormatter < Formatter
METADATA_CALLOUT = "@timber.io".freeze

class MsgPackFormatter < Formatter
def call(severity, time, progname, msg)
log_entry = build_log_entry(severity, time, progname, msg)
metadata = log_entry.to_json(:except => [:message])
# use << for concatenation for performance reasons
log_entry.message.gsub("\n", "\\n") << " " << METADATA_CALLOUT << " " << metadata << "\n"
build_log_entry(severity, time, progname, msg).as_json.to_msgpack << "\n"
end
end

Expand Down
Loading

0 comments on commit 9c44cee

Please sign in to comment.