A processing pipe-line (similar to logstash) written in Go
Clone and install dependencies:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone git@github.com:urban-1/gopipe.git
dep ensure
There is a Makefile you can use to quickly build and start working on this project:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Our goal is to define a pipeline where events are received and processed in a modular way. Events can contain:
- Raw unformated bytes (
Data["raw"]
) - String contents (
Data["message"]
) - Decoded JSON value (
Data
asmap[string]interface{}
)
We call our modules "Components" and these fall into three categories:
- Input: Components that generate events and push them into a Go channel
- Proc: Processing components that modify event data. They read and write events from and to Go channels
- Output: there are just reading events and output in some form. They usually do not push them back into a Go channel, but instead discard them.
In essence, all components are the "same" (implement the same interface). The only difference is which channels are made available to them.
-
Well... I have done something similar in C++ for processing netflow packets and thought since Go is (really) fast and concurrent is a perfect match for such an application.
-
Why not use something already out there: We could extend an existing framework however, this is a Go learning exercise to replicate the C++ code...
-
How is that different? We focus on a systems perspective and we want this framework to be more network/data oriented rather than
log
s oriented:- Raw data handling - see the
flowreplicator.json
configuration. - Allows the user to write more logic, still based on config (if/else support)
- (Experimental) Support tasks to feed and update processing module's data
- Raw data handling - see the
-
What are the future plans: We plan to maintain and extend this until we fully port our C++ code... Maintenance will continue but we kinda hope we will extend as needed with the help of the community.
- TCP: Supporting raw, string, CSV and JSON
- UDP: Supporting raw, string, CSV and JSON
- Kafka: Supporting raw, string, CSV and JSON
- Add field: Add a new field based on static value or expression
- Add time: Adds timestamp to the data
- Cast: Converts fields to different data types
- Drop field: Removes fields from data
- If/Else: Control flow with if/else/endif
- In List: Checks a field against a list of values
- Log: Logs the events' data to stdout
- Longest Prefix Match: Performs LPM and attaches meta-data to the events' data
- MD5: Hash event's fields
- Regex: Convert string events into data ones
- Sampler: Selectively forward events (one every X)
Will replicate and optionally sample UDP packtes:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Receives lines over TCP, parses them into data fields, adds timestamp, converts some data to different data-types, discards the original message, hashes some fields, etc
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Receive on a TCP socket listening for JSON line:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
The following config part defines a task that runs every 10 seconds. Usually you
would like to update file sources for InListProc
and LPMProc
components...
In such cases the idea is that you have a small shell-script somewhere in your
system that will update your local files. Then you need to "invoke" a reload to
load the new data in memory:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Above we define two tasks. The difference between them is that the first one
will signal a component if it runs successfully. The signal "reload"
is going
to be sent to component 4
and is up to the component to handle it.
The component index is defined as the order of this component in config
including input components. Given that at the moment we only support one
input, component 4
above is the 3rd in proc
section.
- Only one input is supported at the moment but this might change
- A bit immature framework :) we need more components
- JSON: Decoding with
UseNumber()
is needed for correct output, however, it breaksgovaluate
so when comparing you have to usejson_to_float64()
. SeeTestIf
for example...
Hello! The idea is that we can provide JSON-configurable pipeline processing capability for the end user. However, we do need more components for various jobs and maybe codecs!
-
Components should be extremely easy to implement. Use
proc/log.go
as a starting point (~60 LOC) to implement you component. -
Codecs: Have a quick look into
linecodecs.go
. One can easily implement new line encoders/decoders. These can then be plugged into input/output modules
Not sure with what to help? have a look at TODO.md As always, comments, suggestions, documentation, bug reports, etc are more than welcome :)