Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1307 from mozilla-services/splitters
Browse files Browse the repository at this point in the history
Splitters
  • Loading branch information
rafrombrc committed Feb 5, 2015
2 parents 50bfce2 + 4c22f64 commit aa7e9b0
Show file tree
Hide file tree
Showing 77 changed files with 3,316 additions and 3,365 deletions.
8 changes: 5 additions & 3 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ Backwards Incompatibilities
* Stats Accum Input treats gauge inputs as float64 rather than int64 to match
statsd spec and other statsd implementations (#850).

* Introduction of Splitter plugins, accompanied by major changes to how input
plugins work to support the use of Splitter plugins. (#424)

Bug Handling
------------

Expand All @@ -45,6 +48,8 @@ Bug Handling
complete records up to the end of the stream without having to
use GetRemainingData() (#1305).

* HttpListenInput no longer URL unescapes the HTTP request body (#1124).

* SmtpOutput now encodes email subject when necessary (#1277).

Features
Expand Down Expand Up @@ -76,9 +81,6 @@ Features
and an action to take if the max size is reached, from 'shutdown', 'drop',
or 'block' (#1110).

* Added `unescape_body` option to HttpListenInput to allow users to turn off
unescaping of the HTTP request body (#1124).

* Added `send_interval` setting to SmtpOutput.

* Added `timestamp` format setting to ESLogstashV0Encoder (#1142).
Expand Down
4 changes: 4 additions & 0 deletions cmake/mocks.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ add_internal_mock(pipeline mock_filterrunner_test.go FilterRunner
add_internal_mock(pipeline mock_outputrunner_test.go OutputRunner plugin_runners.go)
add_internal_mock(pipeline mock_input_test.go Input plugin_interfaces.go)
add_internal_mock(pipeline mock_stataccumulator_test.go StatAccumulator stat_accum_input.go)
add_internal_mock(pipeline mock_deliverer_test.go Deliverer plugin_runners.go)
add_internal_mock(pipeline mock_splitterrunner_test.go SplitterRunner splitter_runner.go)

add_external_mock(pipelinemock mock_pluginhelper.go github.com/mozilla-services/heka/pipeline PluginHelper)
add_external_mock(pipelinemock mock_filterrunner.go github.com/mozilla-services/heka/pipeline FilterRunner)
Expand All @@ -57,6 +59,8 @@ add_external_mock(pipelinemock mock_outputrunner.go github.com/mozilla-services/
add_external_mock(pipelinemock mock_inputrunner.go github.com/mozilla-services/heka/pipeline InputRunner)
add_external_mock(pipelinemock mock_decoder.go github.com/mozilla-services/heka/pipeline Decoder)
add_external_mock(pipelinemock mock_stataccumulator.go github.com/mozilla-services/heka/pipeline StatAccumulator)
add_external_mock(pipelinemock mock_deliverer.go github.com/mozilla-services/heka/pipeline Deliverer)
add_external_mock(pipelinemock mock_splitterrunner.go github.com/mozilla-services/heka/pipeline SplitterRunner)

add_external_mock(pipeline/testsupport mock_net_conn.go net Conn)
add_external_mock(pipeline/testsupport mock_net_listener.go net Listener)
Expand Down
24 changes: 21 additions & 3 deletions cmd/heka-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2014
# Portions created by the Initial Developer are Copyright (C) 2014-2015
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mike Trinkala (trink@mozilla.com)
# Rob Miller (rmiller@mozilla.com)
#
# ***** END LICENSE BLOCK *****/

/*
Expand All @@ -31,6 +33,18 @@ import (
"time"
)

func makeSplitterRunner() (pipeline.SplitterRunner, error) {
splitter := &pipeline.HekaFramingSplitter{}
config := splitter.ConfigStruct()
err := splitter.Init(config)
if err != nil {
return nil, fmt.Errorf("Error initializing HekaFramingSplitter: %s", err)
}
srConfig := pipeline.CommonSplitterConfig{}
sRunner := pipeline.NewSplitterRunner("HekaFramingSplitter", splitter, srConfig)
return sRunner, nil
}

func main() {
flagMatch := flag.String("match", "TRUE", "message_matcher filter expression")
flagFormat := flag.String("format", "txt", "output format [txt|json|heka|count]")
Expand Down Expand Up @@ -75,14 +89,18 @@ func main() {
os.Exit(5)
}

parser := pipeline.NewMessageProtoParser()
sRunner, err := makeSplitterRunner()
if err != nil {
fmt.Println(err)
os.Exit(7)
}
msg := new(message.Message)
var processed, matched int64

fmt.Printf("Input:%s Offset:%d Match:%s Format:%s Tail:%t Output:%s\n",
flag.Arg(0), *flagOffset, *flagMatch, *flagFormat, *flagTail, *flagOutput)
for true {
n, record, err := parser.Parse(file)
n, record, err := sRunner.GetRecordFromStream(file)
if n > 0 && n != len(record) {
fmt.Printf("Corruption detected at offset: %d bytes: %d\n", offset, n-len(record))
}
Expand Down
2 changes: 1 addition & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
Changelog
=========

.. literalinclude:: ../CHANGES.txt
.. literalinclude:: /../../CHANGES.txt
5 changes: 5 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@
'config/outputs/tcp.rst',
'config/outputs/udp.rst',
'config/outputs/whisper.rst',
'config/splitters/heka_framing.rst',
'config/splitters/index_noref.rst',
'config/splitters/null.rst',
'config/splitters/regex.rst',
'config/splitters/token.rst',
'developing/release.rst',
'sandbox/cookbook.rst',
'sandbox/decoder.rst',
Expand Down
14 changes: 7 additions & 7 deletions docs/source/config/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ Configuring hekad

.. start-hekad-config
A hekad configuration file specifies what inputs, decoders, filters, encoders,
and outputs will be loaded. The configuration file is in `TOML
<https://github.com/mojombo/toml>`_ format. TOML looks very similar to INI
configuration formats, but with slightly more rich data structures and nesting
support.
A hekad configuration file specifies what inputs, splitters, decoders,
filters, encoders, and outputs will be loaded. The configuration file is in
`TOML <https://github.com/mojombo/toml>`_ format. TOML looks very similar to
INI configuration formats, but with slightly more rich data structures and
nesting support.

If hekad's config file is specified to be a directory, all contained files
with a filename ending in ".toml" will be loaded and merged into a single
Expand All @@ -29,7 +29,7 @@ instance of Heka's plugin type "TcpInput":
[tcp:5565]
type = "TcpInput"
parser_type = "message.proto"
splitter = "HekaFramingSplitter"
decoder = "ProtobufDecoder"
address = ":5565"
Expand All @@ -42,7 +42,7 @@ be used as the type. Thus, the following section describes a plugin named
[TcpInput]
address = ":5566"
parser_type = "message.proto"
splitter = "HekaFramingSplitter"
decoder = "ProtobufDecoder"
Note that it's fine to have more than one instance of the same plugin type, as
Expand Down
6 changes: 0 additions & 6 deletions docs/source/config/inputs/amqp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ Config:
- queue_ttl (int):
Allows ability to specify TTL in milliseconds on Queue declaration for
expiring messages. Defaults to undefined/infinite.
- decoder (string):
Decoder name used to transform a raw message body into a structured hekad
message. Must be a decoder appropriate for the messages that come in from
the exchange. If accepting messages that have been generated by an
AMQPOutput in another Heka process then this should be a
:ref:`config_protobuf_decoder` instance.
- retries (RetryOptions, optional):
A sub-section that specifies the settings to be used for restart behavior.
See :ref:`configuring_restarting`
Expand Down
9 changes: 9 additions & 0 deletions docs/source/config/inputs/docker_log.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ Messages will be populated as follows:
- Fields["ContainerID"] (string): The container ID
- Fields["ContainerName"] (string): The container name

.. note::

Logspout expects to be dealing exclusively with textual log file data, and
always assumes that the file data is newline delimited, i.e. one line in
the log file equals one logical unit of data. For this reason, the
DockerLogInput currently does *not* support the use of alternate splitter
plugins. Any splitter setting specified in a DockerLogInput's
configuration will be ignored.

Config:

- endpoint (string):
Expand Down
3 changes: 0 additions & 3 deletions docs/source/config/inputs/file_polling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ Config:
- ticker_interval (unit):
How often, in seconds to input should read the contents of the file.

- decoder (string):
The name of the decoder used to process the payload of the input.

Example:

.. code-block:: ini
Expand Down
3 changes: 0 additions & 3 deletions docs/source/config/inputs/http.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ Config:

Severity level of errors, unreachable connections, and non-200 responses
of successful HTTP requests. Defaults to 1 (alert).
- decoder (string):
The name of the decoder used to further transform the response body text
into a structured hekad message. No default decoder is specified.

Example:

Expand Down
7 changes: 0 additions & 7 deletions docs/source/config/inputs/httplisten.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ Config:
- address (string):
An IP address:port on which this plugin will expose a HTTP server.
Defaults to "127.0.0.1:8325".
- decoder (string):
The name of the decoder used to further transform the request body text
into a structured hekad message. No default decoder is specified.

.. versionadded:: 0.7

Expand All @@ -44,10 +41,6 @@ Config:

.. versionadded:: 0.9

- unescape_body (bool):
Specifies whether or not the received request body will be URL unescaped
before being written to the message payload. Defaults to true.

- request_headers ([]string):
Add additional request headers as message fields. Defaults to empty list.

Expand Down
13 changes: 10 additions & 3 deletions docs/source/config/inputs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ initialization code.

- decoder (string, optional):
Decoder to be used by the input. This should refer to the name of a
decoder plugin section that is specified elsewhere in the TOML
configuration. If supplied, messages will be decoded before being passed
on to the router when the InputRunner's `Deliver` method is called.
registered decoder plugin configuration. If supplied, messages will be
decoded before being passed on to the router when the InputRunner's
`Deliver` method is called.
- synchronous_decode (bool, optional):
If `synchronous_decode` is false, then any specified decoder plugin will
be run by a DecoderRunner in its own goroutine and messages will be passed
Expand All @@ -34,6 +34,13 @@ initialization code.
logging an error message, decode failure will cause the original,
undecoded message to be tagged with a `decode_failure` field (set to true)
and delivered to the router for possible further processing.
- splitter (string, optional)
Splitter to be used by the input. This should refer to the name of a
registered splitter plugin configuration. It specifies how the input
should split the incoming data stream into individual records prior to
decoding and/or injection to the router. Typically defaults to
"NullSplitter", although certain inputs override this with a different
default value.

.. _config_amqp_input:
.. include:: /config/inputs/amqp.rst
Expand Down
15 changes: 12 additions & 3 deletions docs/source/config/inputs/index_noref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ Inputs
Common Input Parameters
=======================

.. versionadded:: 0.9

There are some configuration options that are universally available to all
Heka input plugins. These will be consumed by Heka itself when Heka
initializes the plugin and do not need to be handled by the plugin-specific
initialization code.

- decoder (string, optional):
Decoder to be used by the input. This should refer to the name of a
decoder plugin section that is specified elsewhere in the TOML
configuration. If supplied, messages will be decoded before being passed
on to the router when the InputRunner's `Deliver` method is called.
registered decoder plugin configuration. If supplied, messages will be
decoded before being passed on to the router when the InputRunner's
`Deliver` method is called.
- synchronous_decode (bool, optional):
If `synchronous_decode` is false, then any specified decoder plugin will
be run by a DecoderRunner in its own goroutine and messages will be passed
Expand All @@ -29,6 +31,13 @@ initialization code.
logging an error message, decode failure will cause the original,
undecoded message to be tagged with a `decode_failure` field (set to true)
and delivered to the router for possible further processing.
- splitter (string, optional)
Splitter to be used by the input. This should refer to the name of a
registered splitter plugin configuration. It specifies how the input
should split the incoming data stream into individual records prior to
decoding and/or injection to the router. Typically defaults to
"NullSplitter", although certain inputs override this with a different
default value.

.. include:: /config/inputs/amqp.rst

Expand Down
25 changes: 3 additions & 22 deletions docs/source/config/inputs/logstreamer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,6 @@ Config:
- translation (hash map of hash maps of ints):
A set of translation mappings for matched groupings to the ints to use for
sorting purposes.
- decoder (string):
A :ref:`config_protobuf_decoder` instance must be specified for the
message.proto parser. Use of a decoder is optional for token and regexp
parsers; if no decoder is specified the parsed data is available in the
Heka message payload.
- parser_type (string):
- token - splits the log on a byte delimiter (default).
- regexp - splits the log on a regexp delimiter.
- message.proto - splits the log on protobuf message boundaries
- delimiter (string): Only used for token or regexp parsers.
Character or regexp delimiter used by the parser (default "\\n"). For the
regexp delimiter a single capture group can be specified to preserve the
delimiter (or part of the delimiter). The capture will be added to the start
or end of the log line depending on the delimiter_location configuration.
Note: when a start delimiter is used the last line in the file will not be
processed (since the next record defines its end) until the log is rolled.
- delimiter_location (string): Only used for regexp parsers.
- start - the regexp delimiter occurs at the start of a log line.
- end - the regexp delimiter occurs at the end of the log line (default).
- keep_truncated_messages (bool): Only used for token or regexp parsers.
Whether to keep first part of big message exceeding buffer size or just drop it (default).

- splitter (string, optional):
Defaults to "TokenSplitter", which will split the log stream into one
Heka message per line.
24 changes: 5 additions & 19 deletions docs/source/config/inputs/process.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,6 @@ Config:
If true, for each run of the process chain a message will be generated
with the last command in the chain's stderr as the payload. Defaults to
false.
- decoder (string):
Name of the decoder instance to send messages to. If omitted messages will
be injected directly into Heka's message router.
- parser_type (string):
- token - splits the log on a byte delimiter (default).
- regexp - splits the log on a regexp delimiter.
- delimiter (string): Only used for token or regexp parsers.
Character or regexp delimiter used by the parser (default "\\n"). For the
regexp delimiter a single capture group can be specified to preserve the
delimiter (or part of the delimiter). The capture will be added to the
start or end of the log line depending on the delimiter_location
configuration. Note: when a start delimiter is used the last line in the
file will not be processed (since the next record defines its end) until
the log is rolled.
- delimiter_location (string): Only used for regexp parsers.
- start - the regexp delimiter occurs at the start of a log line.
- end - the regexp delimiter occurs at the end of the log line (default).
- timeout (uint):
Timeout in seconds before any one of the commands in the chain is
terminated.
Expand Down Expand Up @@ -78,11 +61,14 @@ Example:

.. code-block:: ini
[on_space]
type = "TokenSplitter"
delimiter = " "
[DemoProcessInput]
type = "ProcessInput"
ticker_interval = 2
parser_type = "token"
delimiter = " "
splitter = "on_space"
stdout = true
stderr = false
trim = true
Expand Down

0 comments on commit aa7e9b0

Please sign in to comment.