Skip to content
Permalink
Browse files

netlog: unpack embedded messages for direct payload requests

  • Loading branch information...
bictorman committed May 17, 2016
1 parent d3fdf5b commit ed8aba960d2f648207081971f0f2b663b43719d7
Showing with 26 additions and 8 deletions.
  1. +7 −1 errors.go
  2. +2 −1 message.go
  3. +17 −6 topic.go
@@ -6,6 +6,7 @@ package netlog

import (
"fmt"
"io"
"log"
"net/http"

@@ -51,12 +52,14 @@ var (
// ErrInvalidDir is returned when the data folder provided does not exists or is not writable.
ErrInvalidDir = newErr(http.StatusInternalServerError, "netlog: invalid data directory")

// ErrBadRequest is returned when invalid parameters are received.
// ErrBadRequest is returned when invalid parameters are received.
ErrBadRequest = newErr(http.StatusBadRequest, "netlog: bad request")
// ErrInvalidOffset is returned when the requested offset can not be parsed into an number.
ErrInvalidOffset = newErr(http.StatusBadRequest, "netlog: invalid offset")
// ErrInvalidDuration is returned when a given big duration can not be parsed
ErrInvalidDuration = newErr(http.StatusBadRequest, "netlog: invalid duration")
// ErrInvalidCompression is returning when the compression type defined is unknown
ErrInvalidCompression = newErr(http.StatusBadRequest, "netlog: invalid compression type")
// ErrTopicExists is returning when trying to create an already existing topic.
ErrTopicExists = newErr(http.StatusBadRequest, "netlog: topic exists")
// ErrEndOfTopic is returned when the reader has read all the way until the end of the topic.
@@ -69,13 +72,16 @@ var (
// ErrOffsetNotFound is returning when the offset is no longer or not yet present in the topic.
ErrOffsetNotFound = newErr(http.StatusNotFound, "netlog: offset not found")

// ErrCRC is returned when a message's payload does not match's the CRC header.
ErrCRC = newErr(http.StatusInternalServerError, "netlog: checksum error")
// ErrBusy is retuning when trying to close or delete a topic with readers attached to it.
ErrBusy = newErr(http.StatusConflict, "netlog: resource busy")
)

var errmap = map[error]NLError{
biglog.ErrBusy: ErrBusy,
biglog.ErrNotFound: ErrOffsetNotFound,
io.EOF: ErrEndOfTopic,
}

// ExtErr maps external errors, mostly BigLog errors to NetLog errors.
@@ -171,6 +171,7 @@ func unpack(data []byte, comp CompressionType) (msgs []Message, err error) {
var r io.Reader = bytes.NewReader(data)

switch comp {
case 0: // not a set
case CompressionNone:
case CompressionGzip:
r, err = gzip.NewReader(r)
@@ -182,7 +183,7 @@ func unpack(data []byte, comp CompressionType) (msgs []Message, err error) {
r = snappy.NewReader(r)

default:
panic("invalid compression type")
return nil, ErrInvalidCompression
}

// close reader if possible on exit
@@ -239,19 +239,30 @@ func (t *Topic) ReadFrom(r io.Reader) (n int64, err error) {

// Payload is a utility method to fetch the payload of a single offset.
func (t *Topic) Payload(offset int64) ([]byte, error) {
reader, _, err := biglog.NewReader(t.bl, offset)
if err != nil {
return nil, ErrInvalidOffset
reader, ret, err := biglog.NewReader(t.bl, offset)
if err != nil && err != biglog.ErrEmbeddedOffset {
return nil, err
}

// TODO unpack embedded offset
entry, err := ReadMessage(reader)
if err != nil {
return nil, err
}

// TODO check crc?
return entry.Payload(), nil
// extract list of messages out of the stored entry
msgs, err := Unpack(entry)
if err != nil {
return nil, err
}

// ret is the first offset of the returned list
// offset-ret = position of message within the list
msg := msgs[offset-ret]
if !msg.ChecksumOK() {
return nil, ErrCRC
}

return msg.Payload(), nil
}

// CreateScanner creates a new scanner starting at offset `from`.

0 comments on commit ed8aba9

Please sign in to comment.
You can’t perform that action at this time.