Skip to content

Commit

Permalink
Improved ThrottlingException handling
Browse files Browse the repository at this point in the history
* This is an experimental code. Two options are explored:
  1. Remove log messages from the buffer (which delays the transfer implicitly).
     This is safer, but some messages are lost.
  2. Delay the transfer then re-try.
     Consequences are unknown. Risk of compromising system stability.
  Option heyoutline#2 is used by default.
  To trigger option #1, set config purge_buffer_if_throttled: true

* Added logging of successful flushes (only to CloudWatch backend).
  Useful for troubleshooting; currently commented out
* Added heap limit, restricting the Logger process to a hardwired value
  of 32MiB (including message queue)
  • Loading branch information
pmenhart committed Oct 31, 2018
1 parent baaff8d commit 8080733
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Add `cloud_watch` and `aws` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[{:cloud_watch, "~> 0.3.2"},
[{:cloud_watch, "~> 0.3.3"},
{:aws, "~> 0.5.0"}]
end
```
Expand Down
37 changes: 29 additions & 8 deletions lib/cloud_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ defmodule CloudWatch do
log_stream_name = Keyword.get(opts, :log_stream_name)
max_buffer_size = Keyword.get(opts, :max_buffer_size, @default_max_buffer_size)
max_timeout = Keyword.get(opts, :max_timeout, @default_max_timeout)
purge_buffer_if_throttled? = Keyword.get(opts, :purge_buffer_if_throttled, false) # see "ThrottlingException"

# Limit out of memory problems - slow CloudWatch connection may cause message queue grow out of bounds
max_heap_size = 4194304 # words are 8 bytes, i.e. 32 MiB
:erlang.process_flag(:max_heap_size, max_heap_size) # see http://erlang.org/doc/man/erlang.html

# AWS configuration, only if needed by the AWS library
region = Keyword.get(opts, :region)
Expand All @@ -81,6 +86,7 @@ defmodule CloudWatch do
log_stream_name: log_stream_name,
max_buffer_size: max_buffer_size,
max_timeout: max_timeout,
purge_buffer_if_throttled: purge_buffer_if_throttled?,
sequence_token: nil,
flushed_at: nil
}
Expand All @@ -94,7 +100,6 @@ defmodule CloudWatch do
message = state.format
|> Logger.Formatter.format(level, msg, ts, md)
|> IO.chardata_to_string
#buffer = List.insert_at(buffer, -1, %InputLogEvent{message: message, timestamp: ts}) # performance impact of adding at the end?
buffer = [%InputLogEvent{message: message, timestamp: ts} | buffer] # buffer order is not relevant, we'll reverse or sort later if needed
%{state | buffer: buffer, buffer_length: buffer_length + 1, buffer_size: buffer_size + byte_size(message) + 26}
end
Expand All @@ -115,12 +120,14 @@ defmodule CloudWatch do
do_flush(state, opts, log_group_name, log_stream_name)
end

defp do_flush(%{buffer: buffer} = state, opts, log_group_name, log_stream_name) do
defp do_flush(%{buffer: buffer, buffer_length: buffer_length} = state, opts, log_group_name, log_stream_name) do
events = %{logEvents: Enum.sort_by(buffer, &(&1.timestamp)),
logGroupName: log_group_name, logStreamName: log_stream_name, sequenceToken: state.sequence_token}
case AwsProxy.put_log_events(state.client, events) do
{:ok, %{"nextSequenceToken" => next_sequence_token}, _} ->
{:ok, state |> purge_buffer() |> Map.put(:sequence_token, next_sequence_token)}
{:ok, state |> purge_buffer() |> Map.put(:sequence_token, next_sequence_token)
# |> add_internal_info("CloudWatch Log flushed buffer (#{inspect buffer_length} messages)")
}
{:error, {"DataAlreadyAcceptedException", "The given batch of log events has already been accepted. The next batch can be sent with sequenceToken: " <> next_sequence_token}} ->
state
|> Map.put(:sequence_token, next_sequence_token)
Expand Down Expand Up @@ -148,11 +155,21 @@ defmodule CloudWatch do
|> do_flush(opts, log_group_name, log_stream_name)
{:error, {"ThrottlingException", "Rate exceeded"}} ->
# AWS limit is 5 requests per second per log stream. We are supposed to re-try after a delay
# Sleeping here is a quick and dirty hack with possible unwanted consequences
# Better approach: introduce a blackout period. Start removing old logs if buffer size exceeded 1 MB during blackout
state = state |> add_internal_error("CloudWatch Log ThrottlingException: delaying transfer")
Process.sleep(500)
flush(state, opts)
if state.purge_buffer_if_throttled do
# Safe option: delay the transfer by removing all messages from the buffer (some messages will be lost!).
{
:ok,
state
|> purge_buffer()
|> add_internal_error("CloudWatch Log ThrottlingException: #{inspect buffer_length} messages were lost!}")
}
else
# Sleeping here is a quick and dirty hack with possible unwanted consequences
# Better approach: introduce a blackout period. Start removing old logs if buffer size exceeded 1 MB during blackout
state = state |> add_internal_error("CloudWatch Log ThrottlingException: delaying transfer")
Process.sleep(500)
flush(state, opts)
end
{:error, {"ExpiredTokenException", _}} ->
# aws-elixir may require restarting of state.client; ex_aws handles expired tokens internally
flush(state, opts)
Expand All @@ -167,6 +184,10 @@ defmodule CloudWatch do
add_internal_message(state, :error, msg)
end

# defp add_internal_info(state, msg) do
# add_internal_message(state, :info, msg)
# end

defp add_internal_message(state, level, msg) do
utc_log? = Application.get_env(:logger, :utc_log, false)
state
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule CloudWatch.Mixfile do

def project do
[app: :cloud_watch,
version: "0.3.2",
version: "0.3.3",
elixir: "~> 1.5",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
Expand Down

0 comments on commit 8080733

Please sign in to comment.