Permalink
Browse files

start of taillion package

Signed-off-by: Peter Edge <peter.edge@gmail.com>
  • Loading branch information...
peter-edge committed Jan 27, 2016
1 parent 4680d52 commit 57de86899bf29d76948cde222b3493dfbcccfac8
Showing with 202 additions and 11 deletions.
  1. +19 −6 proto/io.go
  2. +2 −1 proto/marshal.go
  3. +2 −2 proto/protolion.go
  4. +129 −0 tail/taillion.go
  5. +50 −2 testing/testing_test.go
View
@@ -23,7 +23,7 @@ var errInvalidVarint = errors.New("invalid varint32 encountered")
// encoded messages of the same type together in a file. It returns the total
// number of bytes written and any applicable error. This is roughly
// equivalent to the companion Java API's MessageLite#writeDelimitedTo.
func writeDelimited(w io.Writer, m proto.Message, base64Encode bool) (n int, err error) {
func writeDelimited(w io.Writer, m proto.Message, base64Encode bool) (int, error) {
buffer, err := proto.Marshal(m)
if err != nil {
return 0, err
@@ -40,8 +40,16 @@ func writeDelimited(w io.Writer, m proto.Message, base64Encode bool) (n int, err
return sync, err
}
n, err = w.Write(buffer)
return n + sync, err
n, err := w.Write(buffer)
sync += n
if err != nil {
return sync, err
}
if base64Encode {
n, err = w.Write([]byte{'\n'})
sync += n
}
return sync, err
}
// readDelimited decodes a message from the provided length-delimited stream,
@@ -55,7 +63,7 @@ func writeDelimited(w io.Writer, m proto.Message, base64Encode bool) (n int, err
// an error if a message has been read and decoded correctly, even if the end
// of the stream has been reached in doing so. In that case, any subsequent
// calls return (0, io.EOF).
func readDelimited(r io.Reader, m proto.Message, base64Decode bool) (n int, err error) {
func readDelimited(r io.Reader, m proto.Message, base64Decode bool) (int, error) {
// Per AbstractParser#parsePartialDelimitedFrom with
// CodedInputStream#readRawVarint32.
headerBuf := make([]byte, binary.MaxVarintLen32)
@@ -93,9 +101,14 @@ func readDelimited(r io.Reader, m proto.Message, base64Decode bool) (n int, err
if base64Decode {
messageBuf, err = base64.StdEncoding.DecodeString(string(messageBuf))
if err != nil {
return n, err
return bytesRead, err
}
newlineBuf := make([]byte, 1)
n, err := io.ReadFull(r, newlineBuf)
bytesRead += n
if err != nil {
return bytesRead, err
}
}
return bytesRead, proto.Unmarshal(messageBuf, m)
}
View
@@ -51,7 +51,8 @@ func (m *delimitedMarshaller) Marshal(entry *lion.Entry) ([]byte, error) {
if _, err := writeDelimited(buffer, protoEntry, m.base64Encode); err != nil {
return nil, err
}
return buffer.Bytes(), nil
data := buffer.Bytes()
return data, nil
}
type delimitedUnmarshaller struct {
View
@@ -22,10 +22,10 @@ var (
// DelimitedUnmarshaller is an Unmarshaller that uses the protocol buffers write delimited scheme.
DelimitedUnmarshaller = newDelimitedUnmarshaller(false)
// Base64DelimitedMarshaller is a Marshaller that uses the protocol buffers write delimited scheme,
// but encodes the encoded protocol buffer message using base64.StdEncoding.
// but encodes the encoded protocol buffer message using base64.StdEncoding, and a newline is added.
Base64DelimitedMarshaller = newDelimitedMarshaller(true)
// Base64DelimitedUnmarshaller is an Unmarshaller that uses the protocol buffers write delimited scheme,
// but decoded the encoded protocol buffer message using base64.StdEncoding.
// but decoded the encoded protocol buffer message using base64.StdEncoding, and a newline is added.
Base64DelimitedUnmarshaller = newDelimitedUnmarshaller(true)
// DefaultJSONMarshalFunc is the default protocol buffers JSONMarshalFunc.
DefaultJSONMarshalFunc = func(writer io.Writer, data interface{}) error {
View
@@ -0,0 +1,129 @@
/*
Package taillion implements tailing of lion log files.
Uses https://github.com/hpcloud/tail.
*/
package taillion
import (
"bytes"
"github.com/hpcloud/tail"
"go.pedge.io/lion"
)
// EntryHandler handles a lion.Entry that was read.
type EntryHandler func(*lion.Entry) error
// ErrorHandler handles an error from tailing/
// If it returns true, Tail should return.
type ErrorHandler func(error) bool
// SeekInfo represents arguments to `os.Seek`
type SeekInfo struct {
Offset int64
Whence int // os.SEEK_*
}
// TailOptions are options to Tail.
type TailOptions struct {
Location *SeekInfo // Seek to this location before tailing
ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify
Pipe bool // Is a named pipe (mkfifo)
Follow bool // Continue looking for new lines (tail -f)
Done chan struct{} // Stop when this channel closes
}
// Tail tails the given file for entries.
func Tail(
filePath string,
unmarshaller lion.Unmarshaller,
entryHandler EntryHandler,
errorHandler ErrorHandler, // if nil, Tail will just return the error
options TailOptions,
) error {
t, err := tail.TailFile(filePath, tailOptionsToConfig(options))
if err != nil {
return err
}
if options.Done != nil {
for {
select {
case <-options.Done:
err = t.Stop()
t.Cleanup()
return err
case line := <-t.Lines:
if err := handleLine(
unmarshaller,
entryHandler,
errorHandler,
line.Text,
); err != nil {
return err
}
}
}
} else {
for line := range t.Lines {
if err := handleLine(
unmarshaller,
entryHandler,
errorHandler,
line.Text,
); err != nil {
return err
}
}
}
t.Cleanup()
return nil
}
func tailOptionsToConfig(tailOptions TailOptions) tail.Config {
config := tail.Config{
ReOpen: tailOptions.ReOpen,
MustExist: tailOptions.MustExist,
Poll: tailOptions.Poll,
Pipe: tailOptions.Pipe,
Follow: tailOptions.Follow,
}
if tailOptions.Location != nil {
config.Location = &tail.SeekInfo{
Offset: tailOptions.Location.Offset,
Whence: tailOptions.Location.Whence,
}
}
return config
}
func handleLine(
unmarshaller lion.Unmarshaller,
entryHandler EntryHandler,
errorHandler ErrorHandler,
line string,
) error {
entry, err := lineToEntry(unmarshaller, line)
if err != nil {
if errorHandler == nil || errorHandler(err) {
return err
}
} else {
if err := entryHandler(entry); err != nil {
if errorHandler == nil || errorHandler(err) {
return err
}
}
}
return nil
}
func lineToEntry(unmarshaller lion.Unmarshaller, line string) (*lion.Entry, error) {
encodedEntry := &lion.EncodedEntry{}
if err := unmarshaller.Unmarshal(bytes.NewReader([]byte(line)), encodedEntry); err != nil {
return nil, err
}
return encodedEntry.Decode()
}
View
@@ -4,19 +4,22 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"
"go.pedge.io/lion"
"go.pedge.io/lion/kit"
"go.pedge.io/lion/proto"
"go.pedge.io/lion/tail"
"github.com/stretchr/testify/require"
)
func TestRoundtripAndTextMarshaller(t *testing.T) {
testRoundTripAndTextMarshaller(
testBothRoundTripAndTextMarshaller(
t,
func(logger protolion.Logger) {
logger.Debug(
@@ -85,7 +88,7 @@ WARN a warning line {"someKey":"someValue"}
}
func TestLevelNone(t *testing.T) {
testRoundTripAndTextMarshaller(
testBothRoundTripAndTextMarshaller(
t,
func(logger protolion.Logger) {
logger = logger.AtLevel(lion.LevelPanic)
@@ -98,6 +101,11 @@ func TestLevelNone(t *testing.T) {
)
}
func testBothRoundTripAndTextMarshaller(t *testing.T, f func(protolion.Logger), expected string) {
testRoundTripAndTextMarshaller(t, f, expected)
//testRoundTripAndTextMarshallerTail(t, f, expected)
}
func testRoundTripAndTextMarshaller(t *testing.T, f func(protolion.Logger), expected string) {
for _, marshalPair := range []struct {
marshaller lion.Marshaller
@@ -144,6 +152,46 @@ func testRoundTripAndTextMarshaller(t *testing.T, f func(protolion.Logger), expe
}
}
func testRoundTripAndTextMarshallerTail(t *testing.T, f func(protolion.Logger), expected string) {
file, err := ioutil.TempFile("", "lion")
require.NoError(t, err)
filePath := file.Name()
// will not actually get called if a require statement is hit
defer func() {
_ = file.Close()
_ = os.Remove(filePath)
}()
fakeTimer := newFakeTimer(0)
logger := protolion.NewLogger(
lion.NewLogger(
lion.NewWritePusher(
file,
protolion.Base64DelimitedMarshaller,
),
lion.LoggerWithIDAllocator(newFakeIDAllocator()),
lion.LoggerWithTimer(fakeTimer),
),
)
f(logger)
writeBuffer := bytes.NewBuffer(nil)
writePusher := lion.NewTextWritePusher(
writeBuffer,
lion.TextMarshallerDisableTime(),
)
require.NoError(
t,
taillion.Tail(
filePath,
protolion.Base64DelimitedUnmarshaller,
writePusher.Push,
nil,
taillion.TailOptions{},
),
)
require.Equal(t, expected, writeBuffer.String())
}
func TestPrintSomeStuff(t *testing.T) {
testPrintSomeStuff(t, protolion.NewLogger(lion.DefaultLogger))
}

0 comments on commit 57de868

Please sign in to comment.