Skip to content

Commit

Permalink
netlog: differentiate no compression and default
Browse files Browse the repository at this point in the history
  • Loading branch information
bictorman committed Mar 27, 2016
1 parent bd571d4 commit 5f11993
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
30 changes: 21 additions & 9 deletions message.go
Expand Up @@ -25,12 +25,14 @@ const (
type CompressionType uint8

const (
// CompressionNone is used by single messages
CompressionNone CompressionType = 0
// CompressionDefault is used when falling back to the default compression of the system.
CompressionDefault CompressionType = 0
// CompressionNone is used by messages sets with uncompressed payloads
CompressionNone CompressionType = 1
// CompressionGzip is used by message sets with gzipped payloads
CompressionGzip CompressionType = 1
CompressionGzip CompressionType = 2
// CompressionSnappy is used by message sets with snappy payloads
CompressionSnappy CompressionType = 2
CompressionSnappy CompressionType = 3
)

// MessageFromPayload returns a message with the appropriate calculated headers from a give data payload.
Expand All @@ -48,16 +50,14 @@ func MessageFromPayload(p []byte) Message {
// MessageSet will panic if a compression type is not provided, since nothing would indicate to streaming
// clients that further messages are embedded in the payload.
func MessageSet(msgs []Message, comp CompressionType) Message {

if comp == CompressionNone {
panic("can not generate message-set without compression")
}

// TODO buffer pool?
buf := &bytes.Buffer{}
var w io.WriteCloser

switch comp {
case CompressionNone:
w = NopWCloser(buf)

case CompressionGzip:
w = gzip.NewWriter(buf)

Expand Down Expand Up @@ -208,3 +208,15 @@ func unpack(data []byte, comp CompressionType) (msgs []Message, err error) {

return msgs, err
}

// NopWCloser returns a WriteCloser with a no-op
// Close method wrapping the provided Writer w.
func NopWCloser(w io.Writer) io.WriteCloser {
return nopWCloser{w}
}

type nopWCloser struct {
io.Writer
}

func (nopWCloser) Close() error { return nil }
2 changes: 1 addition & 1 deletion message_buffer_test.go
Expand Up @@ -15,10 +15,10 @@ import (
func TestMessageBuffer(t *testing.T) {
t.Parallel()

// TODO add CompressionNone when supported
comps := []CompressionType{
CompressionGzip,
CompressionSnappy,
CompressionNone,
}

for _, comp := range comps {
Expand Down
2 changes: 1 addition & 1 deletion topic.go
Expand Up @@ -68,7 +68,7 @@ func newTopic(bl *biglog.BigLog, settings TopicSettings, defaultSettings TopicSe
settings.BatchInterval = defaultSettings.BatchInterval
}

if settings.CompressionType == 0 {
if settings.CompressionType == CompressionDefault {
settings.CompressionType = defaultSettings.CompressionType
}

Expand Down

0 comments on commit 5f11993

Please sign in to comment.