Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
binarylogic committed Dec 15, 2016
2 parents 74cd6f3 + 5be0293 commit f7ed880
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 113 deletions.
1 change: 1 addition & 0 deletions lib/timber.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# core classes
require "json" # brings to_json to the core classes
require "msgpack" # brings to_msgpack to the core classes

# Base (must come first, order matters)
require "timber/config"
Expand Down
114 changes: 75 additions & 39 deletions lib/timber/log_devices/http.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
require "monitor"
require "msgpack"
require "timber/log_devices/http/triggered_buffer"

module Timber
module LogDevices
# A log device that buffers and sends logs to the Timber API over HTTP in intervals. The buffer
# uses MessagePack::Buffer, which is fast, efficient with memory, and reduces
# the payload size sent to Timber.
# A log device that buffers and delivers log messages over HTTPS to the Timber API in batches.
# The buffer and delivery strategy are very efficient and the log messages will be delivered in
# msgpack format.
#
# See {#initialize} for options and more details.
class HTTP
class DeliveryError < StandardError; end

API_URI = URI.parse("https://api.timber.io/http_frames")
CONTENT_TYPE = "application/json".freeze
CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze
CONNECTION_HEADER = "keep-alive".freeze
USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze

HTTPS = Net::HTTP.new(API_URI.host, API_URI.port).tap do |https|
https.use_ssl = true
https.read_timeout = 30
Expand All @@ -23,62 +21,100 @@ class DeliveryError < StandardError; end
end
https.open_timeout = 10
end
DELIVERY_FREQUENCY_SECONDS = 2.freeze
RETRY_LIMIT = 3.freeze
BACKOFF_RATE_SECONDS = 3.freeze

DEFAULT_DELIVERY_FREQUENCY = 2.freeze

# Instantiates a new HTTP log device.
# Instantiates a new HTTP log device that can be passed to {Timber::Logger#initialize}.
#
# @param api_key [String] The API key provided to you after you add your application to
# [Timber](https://timber.io).
# @param [Hash] options the options to create a HTTP log device with.
# @option attributes [Symbol] :frequency_seconds (2) How often the client should
# @option attributes [Symbol] :payload_limit_bytes Determines the maximum size in bytes that
# and HTTP payload can be. Please see {TriggereBuffer#initialize} for the default.
# @option attributes [Symbol] :buffer_limit_bytes Determines the maximum size of the total
# buffer. This should be many times larger than the `:payload_limit_bytes`.
# Please see {TriggereBuffer#initialize} for the default.
# @option attributes [Symbol] :buffer_overflow_handler (nil) When a single message exceeds
# `:payload_limit_bytes` or the entire buffer exceeds `:buffer_limit_bytes`, the Proc
# passed to this option will be called with the msg that would overflow the buffer. See
# the examples on how to use this properly.
# @option attributes [Symbol] :delivery_frequency_seconds (2) How often the client should
# attempt to deliver logs to the Timber API. The HTTP client buffers logs between calls.
#
# @example Basic usage
# Timber::Logger.new(Timber::LogDevices::HTTP.new("my_timber_api_key"))
#
# @example Handling buffer overflows
# # Persist overflowed lines to a file
# # Note: You could write these to any permanent storage.
# overflow_log_path = "/path/to/my/overflow_log.log"
# overflow_handler = Proc.new { |log_line_msg| File.write(overflow_log_path, log_line_ms) }
# http_log_device = Timber::LogDevices::HTTP.new("my_timber_api_key",
# buffer_overflow_handler: overflow_handler)
# Timber::Logger.new(http_log_device)
def initialize(api_key, options = {})
@api_key = api_key
@buffer = nil
@monitor = Monitor.new
@delivery_thread = Thread.new do
at_exit { deliver }
@buffer = TriggeredBuffer.new(
payload_limit_bytes: options[:payload_limit_bytes],
limit_bytes: options[:buffer_limit_bytes],
overflow_handler: options[:buffer_overflow_handler]
)
@delivery_interval_thread = Thread.new do
loop do
sleep options[:frequency_seconds] || DEFAULT_DELIVERY_FREQUENCY
deliver
sleep(options[:delivery_frequency_seconds] || DELIVERY_FREQUENCY_SECONDS)
buffer_for_delivery = @buffer.reserve
if buffer_for_delivery
deliver(buffer_for_delivery)
end
end
end
end

# Write a new log line message to the buffer, and deliver if the msg exceeds the
# payload limit.
def write(msg)
@monitor.synchronize {
@buffer << msg
}
buffer_for_delivery = @buffer.write(msg)
if buffer_for_delivery
deliver(buffer_for_delivery)
end
true
end

# Closes the log device, cleans up, and attempts one last delivery.
def close
@delivery_thread.kill
@delivery_interval_thread.kill
buffer_for_delivery = @buffer.reserve
if buffer_for_delivery
deliver(buffer_for_delivery)
end
end

private
def deliver
@monitor.synchronize {
body = @buffer.read

request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req|
req['Authorization'] = authorization_payload
req['Connection'] = CONNECTION_HEADER
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = body
end
def deliver(body)
Thread.new do
RETRY_LIMIT.times do |try_index|
request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req|
req['Authorization'] = authorization_payload
req['Connection'] = CONNECTION_HEADER
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = body
end

HTTPS.request(request).tap do |res|
res = HTTPS.request(request)
code = res.code.to_i
if code < 200 || code >= 300
raise DeliveryError.new("Bad response from Timber API - #{res.code}: #{res.body}")
Config.instance.logger.debug("Timber HTTP delivery failed - #{res.code}: #{res.body}")
sleep((try_index + 1) * BACKOFF_RATE_SECONDS)
else
@buffer.remove(body)
Config.instance.logger.debug("Timber HTTP delivery successful - #{code}")
break # exit the loop
end
Config.instance.logger.debug("Success! #{code}: #{res.body}")
end

@buffer.clear
}
end
end

def authorization_payload
Expand Down
79 changes: 79 additions & 0 deletions lib/timber/log_devices/http/triggered_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require "monitor"

module Timber
module LogDevices
class HTTP
# Maintains a triggered buffer, where the trigger is {PAYLOAD_LIMIT_BYTES}. Once the buffer
# exceeds this limit it will lock and return that buffer up to that point while still making
# a new buffer available for writes. This ensures that the HTTP client can attempt to deliver
# the buffer contents without blocking execution of the application.
#
# If the overall buffer exceeeds the overall limit (specified by the `:limit_bytes` option),
# then a buffer overflow is triggered. This can be customized using the `:overflow_handler`
# option.
class TriggeredBuffer
DEFAULT_PAYLOAD_LIMIT_BYTES = 5_000_000 # 5mb, the Timber API will not accept messages larger than this
DEFAULT_LIMIT_BYTES = 50_000_000 # 50mb

def initialize(options = {})
@buffers = []
@monitor = Monitor.new
@payload_limit_bytes = options[:payload_limit_bytes] || DEFAULT_PAYLOAD_LIMIT_BYTES
@limit_bytes = options[:limit_bytes] || DEFAULT_LIMIT_BYTES
@overflow_handler = options[:overflow_handler]
end

def write(msg)
if msg.bytesize > @payload_limit_bytes || (msg.bytesize + total_bytesize) > @limit_bytes
handle_overflow(msg)
return nil
end

@monitor.synchronize do
buffer = writable_buffer
if @buffers == [] || buffer.nil? || buffer.frozen?
@buffers << msg
nil
elsif (buffer.bytesize + msg.bytesize) > @payload_limit_bytes
@buffers << msg
buffer.freeze
else
buffer << msg
nil
end
end
end

def reserve
@monitor.synchronize do
buffer = writable_buffer
if buffer
buffer.freeze
end
end
end

def remove(buffer)
@monitor.synchronize do
@buffers.delete(buffer)
end
end

private
def total_bytesize
@buffers.reduce(0) { |acc, buffer| acc + buffer.bytesize }
end

def writable_buffer
@buffers.find { |buffer| !buffer.frozen? }
end

def handle_overflow(msg)
if @overflow_handler
@overflow_handler.call(msg)
end
end
end
end
end
end
49 changes: 39 additions & 10 deletions lib/timber/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ def build_log_entry(severity, time, progname, msg)
end
end

# Structures your log messages into Timber's hybrid format, which makes
# it easy to read while also appending the appropriate metadata.
#
# logger = Timber::Logger.new(STDOUT)
# logger.formatter = Timber::JSONFormatter.new
#
# Example message:
#
# My log message @timber.io {"level":"info","dt":"2016-09-01T07:00:00.000000-05:00"}
#
class HybridFormatter < Formatter
METADATA_CALLOUT = "@timber.io".freeze

def call(severity, time, progname, msg)
log_entry = build_log_entry(severity, time, progname, msg)
metadata = log_entry.to_json(:except => [:message])
# use << for concatenation for performance reasons
log_entry.message.gsub("\n", "\\n") << " " << METADATA_CALLOUT << " " << metadata << "\n"
end
end

# Structures your log messages into JSON.
#
# logger = Timber::Logger.new(STDOUT)
Expand All @@ -97,24 +118,19 @@ def call(severity, time, progname, msg)
end
end

# Structures your log messages into Timber's hybrid format, which makes
# it easy to read while also appending the appropriate metadata.
# Structures your log messages into JSON.
#
# logger = Timber::Logger.new(STDOUT)
# logger.formatter = Timber::JSONFormatter.new
#
# Example message:
#
# My log message @timber.io {"level":"info","dt":"2016-09-01T07:00:00.000000-05:00"}
# {"level":"info","dt":"2016-09-01T07:00:00.000000-05:00","message":"My log message"}
#
class HybridFormatter < Formatter
METADATA_CALLOUT = "@timber.io".freeze

class MsgPackFormatter < Formatter
def call(severity, time, progname, msg)
log_entry = build_log_entry(severity, time, progname, msg)
metadata = log_entry.to_json(:except => [:message])
# use << for concatenation for performance reasons
log_entry.message.gsub("\n", "\\n") << " " << METADATA_CALLOUT << " " << metadata << "\n"
build_log_entry(severity, time, progname, msg).as_json.to_msgpack << "\n"
end
end

Expand All @@ -127,7 +143,20 @@ def call(severity, time, progname, msg)
# logger.formatter = Timber::Logger::JSONFormatter.new
def initialize(*args)
super(*args)
self.formatter = HybridFormatter.new
if args.size == 1 and args.first.is_a?(LogDevices::HTTP)
self.formatter = MsgPackFormatter.new
else
self.formatter = HybridFormatter.new
end
end

def formatter=(value)
if @dev.is_a?(Timber::LogDevices::HTTP)
raise ArgumentError.new("The formatter cannot be changed when using the " +
"Timber::LogDevices::HTTP log device. The MsgPackFormatter must be used for proper " +
"delivery.")
end
super
end

# Backwards compatibility with older ActiveSupport::Logger versions
Expand Down
58 changes: 58 additions & 0 deletions spec/timber/log_devices/http/triggered_buffer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
require "spec_helper"

describe Timber::LogDevices::HTTP::TriggeredBuffer do
describe "#write" do
it "should trigger a buffer overflow for large messages" do
buffer = described_class.new(:payload_limit_bytes => 10)
msg = "a" * 11
expect(buffer).to receive(:handle_overflow).exactly(1).times.with(msg)
buffer.write(msg)
end

it "should trigger a buffer overflow when exceeding the limit" do
buffer = described_class.new(:limit_bytes => 10)
msg = "a" * 11
expect(buffer).to receive(:handle_overflow).exactly(1).times.with(msg)
buffer.write(msg)
end

it "should start a new buffer when empty and append when not" do
buffer = described_class.new
result = buffer.write("test")
expect(result).to be_nil
expect(buffer.send(:writable_buffer)).to eq("test")
result = buffer.write("again")
expect(result).to be_nil
expect(buffer.send(:writable_buffer)).to eq("testagain")
end

it "should return the old buffer when it has exceeded it's limit" do
buffer = described_class.new(:payload_limit_bytes => 10)
msg = "a" * 6
result = buffer.write(msg)
expect(result).to be_nil
result = buffer.write(msg)
expect(result).to eq(msg)
expect(result).to be_frozen
end

it "should write a new buffer when the latest is frozen" do
buffer = described_class.new
buffer.write("test")
result = buffer.reserve
expect(result).to eq("test")
buffer.write("again")
expect(buffer.send(:writable_buffer)).to eq("again")
end
end

describe "#reserve" do
it "should reserve the latest buffer and freeze it" do
buffer = described_class.new
buffer.write("test")
result = buffer.reserve
expect(result).to eq("test")
expect(result).to be_frozen
end
end
end

0 comments on commit f7ed880

Please sign in to comment.