Skip to content
Permalink
Browse files

Initial commit of base functionality

  • Loading branch information...
otoolep committed Jul 11, 2014
1 parent 65cf525 commit 063ca27efe66da381a5d94c37e1f08d78947dd3e
@@ -0,0 +1,8 @@
bin
pkg/
src/code.google.com/
src/gopkg.in/
src/github.com/Shopify/
TODO
src/github.com/otoolep/gollector/gollector
*.swp
@@ -1,4 +1,51 @@
syslog-gollector
================
gollector
========

*gollector* is a Syslog Collector, written in [Go](http://golang.org/), which has support for streaming received messages to [Apache Kafka](https://kafka.apache.org/), version 0.8. The messages can be written to Kafka in parsed format, or written exactly as received.

The logs lines must be [RFC5424](http://tools.ietf.org/html/rfc5424) compliant, and in the following format:

<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROC-ID MSGID MSG"

Consult the RFC to learn what each of these fields is. The TIMESTAMP field must be in [RRC3339](http://www.ietf.org/rfc/rfc3339.txt) format. MSGID must be '-' (nil). Lines not matching this format are dropped by the gollector.

Multi-line Support
------------
The gollector supports multi-line messages, so messages such as stack traces will be considered a single message.

Parsing Mode
------------
Parsing mode is enabled by default. In this mode, the Syslog header is parsed, and the fields become keys in a JSON structure. This JSON structure is then written to Kafka. If parsing mode is not enabled, the log line is written to Kafka as it was received.

For example, imagine the following message is received by the gollector:

<134>1 2013-09-04T10:25:52.618085 ubuntu sshd 1999 - password accepted for user root

With parsing disabled, the line is written as is to Kafka. With parsing enabled, the following JSON object is written to Kafka:

{
"priority":134,
"version":1,
"timestamp":"2013-09-04T10:25:52.618085",
"host":"ubuntu","app":
"sshd",
"pid":1999,
"message":
"password accepted for user root"
}

This parsed form may be useful to downstream consumers.


Building
------------

Running
------------

Execute

gollector --help

for runtime options.

Syslog Collector written in Go, streams to Kafka 0.8
@@ -0,0 +1,104 @@
package main

import (
"flag"
"fmt"
"os"
"runtime"
"strings"

"github.com/otoolep/gollector/input"
"github.com/otoolep/gollector/output"

log "code.google.com/p/log4go"
)

// Program parameters
var iface string
var kBrokers string
var kBatch int
var kTopic string
var kBufferTime int
var kBufferBytes int
var pEnabled bool
var cCapacity int

// Types
const (
connHost = "localhost:514"
connType = "tcp"
kafkaBatch = 10
kafkaBrokers = "localhost:9092"
kafkaTopic = "logs"
kafkaBufferTime = 1000
kafkaBufferBytes = 512 * 1024
parseEnabled = true
chanCapacity = 0
)

func init() {
flag.StringVar(&iface, "i", connHost, "bind interface")
flag.StringVar(&kBrokers, "k", kafkaBrokers, "comma-delimited kafka brokers")
flag.StringVar(&kTopic, "t", kafkaTopic, "kafka topic")
flag.IntVar(&kBatch, "b", kafkaBatch, "Kafka batch size")
flag.IntVar(&kBufferTime, "a", kafkaBufferTime, "Kafka client buffer max time (ms)")
flag.IntVar(&kBufferBytes, "e", kafkaBufferBytes, "Kafka client buffer max bytes")
flag.BoolVar(&pEnabled, "p", parseEnabled, "enable syslog header parsing")
flag.IntVar(&cCapacity, "c", chanCapacity, "channel buffering capacity")
}

func main() {
flag.Parse()

hostname, err := os.Hostname()
if err != nil {
log.Error("unable to determine hostname -- aborting")
os.Exit(1)
}
log.Info("syslog server starting on %s, PID %d", hostname, os.Getpid())
log.Info("machine has %d cores", runtime.NumCPU())

// Log config
log.Info("kafka brokers: %s", kBrokers)
log.Info("kafka topic: %s", kTopic)
log.Info("kafka batch size: %d", kBatch)
log.Info("kafka buffer time: %dms", kBufferTime)
log.Info("kafka buffer bytes: %d", kBufferBytes)
log.Info("parsing enabled: %t", pEnabled)
log.Info("channel buffering capacity: %d", cCapacity)

// Prep the channels
rawChan := make(chan string, cCapacity)
prodChan := make(chan string, cCapacity)

if pEnabled {
// Feed the input through the Parser stage
parser := input.NewRfc5424Parser()
prodChan, err = parser.StreamingParse(rawChan)
} else {
// Pass the input directly to the output
prodChan = rawChan
}

// Connect to Kafka
_, err = output.NewKafkaProducer(prodChan, strings.Split(kBrokers, ","), kTopic, kBufferTime, kBufferBytes)
if err != nil {
log.Error("unable to create Kafka Producer", err)
os.Exit(1)
}
log.Info("connected to kafka at %s", kBrokers)

// Start the server
tcpServer := input.NewTcpServer(iface)
err = tcpServer.Start(func() chan<- string {
return rawChan
})
if err != nil {
fmt.Println("Failed to start server", err.Error())
os.Exit(1)
}
log.Info("listening on %s for connections", iface)

// Spin forever
select {}
}
@@ -0,0 +1,96 @@
package input

import (
"io"
"regexp"
"strings"
)

const (
SYSLOG_DELIMITER = `<[0-9]{1,3}>[0-9]\s$`
)

var startRegex *regexp.Regexp
var runRegex *regexp.Regexp

type Reader interface {
ReadByte() (byte, error)
}

func init() {
startRegex = regexp.MustCompile(SYSLOG_DELIMITER)
runRegex = regexp.MustCompile(`\n` + SYSLOG_DELIMITER)
}

// A Delimiter detects when Syslog lines start.
type Delimiter struct {
buffer []byte
regex *regexp.Regexp
}

// NewDelimiter returns an initialized Delimiter.
func NewDelimiter(maxSize int) *Delimiter {
self := &Delimiter{}
self.buffer = make([]byte, 0, maxSize)
self.regex = startRegex
return self
}

// Push a byte into the Delimiter. If the byte results in a
// a new Syslog message, it'll be flagged via the bool.
func (self *Delimiter) Push(b byte) (string, bool) {
self.buffer = append(self.buffer, b)
delimiter := self.regex.FindIndex(self.buffer)
if delimiter == nil {
return "", false
}

if self.regex == startRegex {
// First match -- switch to the regex for embedded lines, and
// drop any leading characters.
self.buffer = self.buffer[delimiter[0]:]
self.regex = runRegex
return "", false
}

dispatch := strings.TrimRight(string(self.buffer[:delimiter[0]]), "\r")
self.buffer = self.buffer[delimiter[0]+1:]
return dispatch, true
}

// Vestige returns the bytes which have been pushed to Delimiter, since
// the last Syslog message was returned.
func (self *Delimiter) Vestige() (string, bool) {
if len(self.buffer) == 0 {
return "", false
}
dispatch := strings.TrimRight(string(self.buffer), "\r\n")
self.buffer = nil
return dispatch, true
}

// Stream returns a channel, on which the delimited Syslog messages
// are emitted.
func (self *Delimiter) Stream(reader Reader) chan string {
eventChan := make(chan string)

go func() {
for {
b, err := reader.ReadByte()
if err != nil {
if err != io.EOF {
panic(err)
} else {
close(eventChan)
return
}
}

event, match := self.Push(b)
if match {
eventChan <- event
}
}
}()
return eventChan
}
@@ -0,0 +1,71 @@
package input

import (
"bufio"
"net"
"time"

log "code.google.com/p/log4go"
)

const (
newlineTimeout = time.Duration(1000 * time.Millisecond)
)

// A TcpServer binds to the supplied interface and receives Syslog messages.
type TcpServer struct {
iface string
}

// NewTcpServer returns a TCP server.
func NewTcpServer(iface string) *TcpServer {
s := &TcpServer{iface}
return s
}

// Start instructs the TcpServer to bind to the interface and accept connections.
func (s *TcpServer) Start(f func() chan<- string) error {
ln, err := net.Listen("tcp", s.iface)
if err != nil {
return err
}

go func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Error("failed to accept connection", err)
continue
}
log.Info("accepted new connection from %s", conn.RemoteAddr().String())
go s.handleConnection(conn, f)
}
}()
return nil
}

func (s *TcpServer) handleConnection(conn net.Conn, f func() chan<- string) {
defer conn.Close()
delimiter := NewDelimiter(256)
reader := bufio.NewReader(conn)
var event string
var match bool

for {
conn.SetReadDeadline(time.Now().Add(newlineTimeout))
b, err := reader.ReadByte()
if err != nil {
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
event, match = delimiter.Vestige()
} else {
log.Info("Error from connection:", err)
return
}
} else {
event, match = delimiter.Push(b)
}
if match {
f() <- event
}
}
}
Oops, something went wrong.

0 comments on commit 063ca27

Please sign in to comment.
You can’t perform that action at this time.