Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge branch 'router_buffering' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed Jun 25, 2015
2 parents 761f1a4 + 1093849 commit f06b915
Show file tree
Hide file tree
Showing 66 changed files with 3,123 additions and 1,625 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pipeline/mock_*.go
var/
*.sw?
externals
*~
10 changes: 8 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
Backwards Incompatibilities
---------------------------

* Major overhaul of filter and output plugin APIs to support disk buffering
(#1378).

* Go 1.4 now required for building.

* Removed unused PipelinePack.Decoded attribute.

* LogOutput will write data to stdout instead of stderr (#1515).

* Using stftime literals for filenames during rotation in FileOutput plugin
(#1469).

* Implemented stftime format codes in: filenames in FileOutput plugin,
ESJsonEncoder, ESLogstashV0Encoder, Payload encoder (#1469, #1508).

* The package created by 'make deb' creates an "heka" user and ships an init script
and a systemd unit.
* The package created by 'make deb' creates an "heka" user and ships an init
script and a systemd unit.

* The 'make deb' target requires fakeroot and debhelper to be installed.

Expand Down
2 changes: 1 addition & 1 deletion cmake/externals.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ if (INCLUDE_DOCKER_PLUGINS)
endif()

if (INCLUDE_MOZSVC)
add_external_plugin(git https://github.com/mozilla-services/heka-mozsvc-plugins f310f1589afca54ddf6c9f4de826b43acbc1f228)
add_external_plugin(git https://github.com/mozilla-services/heka-mozsvc-plugins 848fc1f3aa858472150c7af5463661393d3c4f3b)
git_clone(https://github.com/getsentry/raven-go 0cc1491d9d27b258a9b4f0238908cb0d51bd6c9b)
add_dependencies(heka-mozsvc-plugins raven-go)
endif()
Expand Down
86 changes: 86 additions & 0 deletions docs/source/buffering.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
.. _buffering:

=====================
Configuring Buffering
=====================

All filter and output plugins support the use of a disk based message queue.
If ``use_buffering`` is set to true, then the router will deliver messages that
match the plugin's :ref:`message matcher <message_matcher>` to the queue
buffer, and the plugin will read from the queue to get messages to process,
instead of the handoff happening in the process RAM via Go channels. This
improves message delivery reliability and allows plugins to reprocess messages
from the queue in cases where upstream servers are down or Heka is recovering
from a hard shutdown.

Each queue buffer supports a few configuration settings in addition to any
options which the plugin might support. These can be specified in a sub-section
of the plugin's TOML configuration section entitled ``buffering``.

Buffering configuration settings
================================

- max_file_size (uint64)
The maximum size (in bytes) of a single file in the queue buffer. When a
message would increase a queue file to greater than this size, the message
will be written into a new file instead. Defaults to 128MiB. Value cannot
be zero, if zero is specified the default will instead be used.

- max_buffer_size (uint64)
Maximum amount of disk space (in bytes) that the entire queue buffer can
consume. Defaults to 0, or no limit. The action taken when the maximum buffer
size is reached is determined by the ``full_action`` setting.

- full_action (string)
The action Heka will take if the queue buffer grows to larger than the
maximum specified by the ``max_buffer_size`` setting. Must be one of the
following values. Defaults to ``shutdown``, although specific plugins might
override this default with a default of their own:

* ``shutdown``: Heka will stop all processing and attempt a clean shutdown.

* ``drop``: Heka will drop the current message and will continue to process
future messages.

* ``block``: Heka will pause message delivery, applying back pressure through
the router to the inputs. Delivery will resume if and when the
queue buffer size reduces to below the specified maximum.

- cursor_update_count (uint)
A plugin is responsible for notifying the queue buffer when a message has
been processed by calling an ``UpdateCursor`` method on the
PluginRunner. Some plugins call this for every message, while others call it
only periodically after processing a large batch of messages. This setting
specifies how many ``UpdateCursor`` calls must be made before the cursor
location is flushed to disk. Defaults to 1, although specific plugins might
override this default with a default of their own. Value cannot be zero, if
zero is specified the default will be used instead.

Buffering Default Values
========================

Please note that if you provide a `buffering` subsection for your plugin
configuration, it is best to specify *all* of the available settings. In cases
where the plugin specifies a non-standard default for one or more of these
values, that default will only be applied if you omit the `buffering`
subsection altogether. If you specify any of the values, it is expected that
you will specify all of the values.

Sample Buffering Configuration
==============================

The following is a sample TcpOutput configuration showing the use of buffering.

.. code-block:: ini
[TcpOutput]
message_matcher = "Type !~ /^heka/"
address = "upstream.example.com:5565"
keep_alive = true
use_buffering = true
[TcpOutput.buffering]
max_file_size = 268435456 # 256MiB
max_buffer_size = 1073741824 # 1GiB
full_action = "block"
cursor_update_count = 100
17 changes: 15 additions & 2 deletions docs/source/config/filters/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,25 @@ initialization code.
- ticker_interval (uint, optional):
Frequency (in seconds) that a timer event will be sent to the filter.
Defaults to not sending timer events.

.. versionadded:: 0.7

- can_exit (bool, optional)
.. versionadded:: 0.7

Whether or not this plugin can exit without causing Heka to shutdown.
Defaults to false for non-sandbox filters, and true for sandbox filters.

.. versionadded:: 0.10

- use_buffering (bool, optional)
If true, all messages delivered to this filter will be buffered to disk
before delivery, preventing back pressure and allowing retries in cases of
message processing failure. Defaults to false, unless otherwise specified
by the individual filter's documentation.
- buffering (QueueBufferConfig, optional)
A sub-section that specifies the settings to be used for the buffering
behavior. This will only have any impact if `use_buffering` is set to
true. See :ref:`buffering`.

Available Filter Plugins
========================

Expand Down
2 changes: 1 addition & 1 deletion docs/source/config/inputs/docker_event.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. _config_docker_event_input:

Docker Event Input
=================
==================

.. versionadded:: 0.10.0

Expand Down
23 changes: 6 additions & 17 deletions docs/source/config/outputs/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,12 @@ Config:
An optional sub-section that specifies the settings to be used for any
SSL/TLS encryption. This will only have any impact if `URL` uses the
`HTTPS` URI scheme. See :ref:`tls`.
- use_buffering: (bool, optional):
Buffer records to a disk-backed buffer on the Heka server before writing them to ElasticSearch.
Defaults to true.
- queue_max_buffer_size (uint64, optional):
Defines maximum queue buffer size, in bytes. Defaults to 0, which means no
max.
- queue_full_action (string, optional):
Specifies how Heka should behave when the queue reaches the specified
maximum capacity. There are currently three possible actions:

- `shutdown` - Shuts down Heka.
- `drop` - New messages are dropped until queue is available again.
Already queued messages are unaffected.
- `block` - Blocks processing of messages, tries to push last message
until its possible.

Defaults to `shutdown`.
- use_buffering (bool, optional):
Buffer records to a disk-backed buffer on the Heka server before writing
them to ElasticSearch. Defaults to true.
- buffering (QueueBufferConfig, optional):
All of the :ref:`buffering <buffering>` config options are set to the
standard default options.

Example:

Expand Down
30 changes: 10 additions & 20 deletions docs/source/config/outputs/tcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ Config:

- tls (TlsConfig, optional):
A sub-section that specifies the settings to be used for any SSL/TLS
encryption. This will only have any impact if `use_tls` is set to true.
encryption. This will only have any impact if ``use_tls`` is set to true.
See :ref:`tls`.
- ticker_interval (uint, optional):
Specifies how often, in seconds, the output queue files are rolled.
Defaults to 300.

.. versionadded:: 0.6

Expand All @@ -49,22 +46,15 @@ Config:
Time duration in seconds that a TCP connection will be maintained before
keepalive probes start being sent. Defaults to 7200 (i.e. 2 hours).

.. versionadded:: 0.9
.. versionadded:: 0.10

- queue_max_buffer_size (uint64):
Defines maximum queue buffer size, in bytes. Defaults to 0, which means no
max.
- queue_full_action (string, optional):
Specifies how Heka should behave when the queue reaches the specified
maximum capacity. There are currently three possible actions:

- `shutdown`: Shutdowns heka.
- `drop`: Messages are dropped until queue is available again. Already queued
messages are unaffected.
- `block`: Blocks processing of messages, tries to push last message
until its possible.

Defaults to `shutdown`.
- use_buffering (bool, optional):
Buffer records to a disk-backed buffer on the Heka server before sending
them out over the TCP connection. Defaults to true.
- buffering (QueueBufferConfig, optional):
All of the :ref:`buffering <buffering>` config options are set to the
standard default options, except for `cursor_update_count`, which is set to
50 instead of the standard default of 1.

Example:

Expand All @@ -74,4 +64,4 @@ Example:
type = "TcpOutput"
address = "heka-aggregator.mydomain.com:55"
local_address = "127.0.0.1"
message_matcher = "Type != 'logfile' && Type != 'heka.counter-output' && Type != 'heka.all-report'"
message_matcher = "Type != 'logfile' && Type !~ /^heka\./'"

0 comments on commit f06b915

Please sign in to comment.