Skip to content
Permalink
Browse files

netlog: create and unpack snappy message-sets

  • Loading branch information...
bictorman committed Mar 11, 2016
1 parent 43531f0 commit e935a02cc7cd0c1cb8dbfb871b98ff140ce08cbf
Showing with 80 additions and 41 deletions.
  1. +49 −40 message.go
  2. +31 −1 message_test.go
@@ -9,6 +9,8 @@ import (
"compress/gzip"
"hash/crc32"
"io"

"github.com/golang/snappy"
)

const (
@@ -19,6 +21,7 @@ const (
headerSize = payloadPos
)

// CompressionType indicates a type of compression for message sets
type CompressionType uint8

const (
@@ -45,13 +48,26 @@ func MessageFromPayload(p []byte) Message {
// MessageSet will panic if a compression type is not provided, since nothing would indicate to streaming
// clients that further messages are embedded in the payload.
func MessageSet(msgs []Message, comp CompressionType) Message {

if comp == CompressionNone {
panic("can not generate message-set without compression")
}

// TODO buffer pool?
buf := &bytes.Buffer{}
w := gzip.NewWriter(buf)
var w io.WriteCloser

switch comp {
case CompressionGzip:
w = gzip.NewWriter(buf)

case CompressionSnappy:
w = snappy.NewWriter(buf)

default:
panic("invalid compression type")
}

for _, m := range msgs {
w.Write(m)
}
@@ -137,65 +153,58 @@ func ReadMessage(r io.Reader) (entry Message, err error) {
return entry, err
}

// Unpack takes a batch of messages and returns the split into an slice
// the batch can be either a simple byte sequence or several messages
// compressed into one message.
func Unpack(m Message) ([]Message, error) {

switch m.Compression() {
case CompressionNone:
return unpackSequence(m)

case CompressionGzip:
return unpackGzip(m.Payload())

case CompressionSnappy:
panic("not yet implemented")
// Unpack takes a message-set and returns a slice with the component messages.
func Unpack(set Message) ([]Message, error) {
if set.Compression() > 0 {
// unpack compressed payload
return unpack(set.Payload(), set.Compression())
}

return nil, nil
// if instead of a compressed message it's a sequence
// of uncompressed ones, reading the compression flag
// is effectively reading the flag on the first message.
// For a sequence we can unpack the data as-is.
return unpack(set, CompressionNone)
}

func unpackSequence(data []byte) (messages []Message, err error) {
r := bytes.NewReader(data)
var msg Message
func unpack(data []byte, comp CompressionType) (msgs []Message, err error) {
var r io.Reader = bytes.NewReader(data)

for {
msg, err = ReadMessage(r)
switch comp {
case CompressionNone:
case CompressionGzip:
r, err = gzip.NewReader(r)
if err != nil {
break
return nil, err
}

messages = append(messages, msg)
}
case CompressionSnappy:
r = snappy.NewReader(r)

if err == io.EOF {
return messages, nil
default:
panic("invalid compression type")
}

return messages, err
}

func unpackGzip(data []byte) (messages []Message, err error) {
r := bytes.NewReader(data)
gr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
var msg Message
// close reader if possible on exit
defer func() {
if r, ok := r.(io.Closer); ok {
r.Close()
}
}()

var m Message
for {
msg, err = ReadMessage(gr)
m, err = ReadMessage(r)
if err != nil {
break
}

messages = append(messages, msg)
msgs = append(msgs, m)
}

if err == io.EOF {
return messages, nil
return msgs, nil
}

return messages, err
return msgs, err
}
@@ -5,6 +5,8 @@ import (
"hash/crc32"
"math/rand"
"testing"

"github.com/golang/snappy"
)

func TestMessage(t *testing.T) {
@@ -69,7 +71,35 @@ func TestUnpackGzip(t *testing.T) {
set := MessageSet(messages, CompressionGzip)

if set.Compression() != CompressionGzip {
t.Errorf("missing gzip flag, got %d", set.Compression())
t.Errorf("Missing gzip flag, got %d", set.Compression())
}

unpacked, err := Unpack(set)
if err != nil {
t.Error(err)
}

if len(unpacked) != len(messages) {
t.Errorf("Unpacked %d messages vs expected %d", len(unpacked), len(messages))
}

for k, m := range messages {
testMessage(t, m.Payload(), unpacked[k])
}
}

func TestUnpackSnappy(t *testing.T) {
t.Parallel()

messages := randMessageSet()
set := MessageSet(messages, CompressionSnappy)

if set.Compression() != CompressionSnappy {
t.Errorf("Missing snappy flag, got %d", set.Compression())
}

if _, err := snappy.DecodedLen(set.Payload()); err != nil {
t.Errorf("Set's payload does not look like snappy: %s", err)
}

unpacked, err := Unpack(set)

0 comments on commit e935a02

Please sign in to comment.
You can’t perform that action at this time.