Skip to content

Commit

Permalink
Broker log recovery.
Browse files Browse the repository at this point in the history
This pull request adds recovery to the messaging.Topic when opening. If
any partial messages are found then the file is truncated at that point
and started from there. This can occur when ungracefully shutting down
a server. It can leave half written messages at the end of segments.
  • Loading branch information
benbjohnson committed Apr 4, 2015
1 parent 93be4b0 commit ba3026f
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [#2156](https://github.com/influxdb/influxdb/pull/2156): Propagate error when resolving UDP address in Graphite UDP server.
- [#2163](https://github.com/influxdb/influxdb/pull/2163): Fix up paths for default data and run storage.
- [#2164](https://github.com/influxdb/influxdb/pull/2164): Append STDOUT/STDERR in initscript.
- [#2167](https://github.com/influxdb/influxdb/pull/2167): Add broker log recovery.

## v0.9.0-rc19 [2015-04-01]

Expand Down
108 changes: 63 additions & 45 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,10 @@ func (b *Broker) Publish(m *Message) (uint64, error) {
}

// TopicReader returns a new topic reader for a topic starting from a given index.
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface {
io.ReadCloser
io.Seeker
} {
return NewTopicReader(b.TopicPath(topicID), index, streaming)
}

Expand Down Expand Up @@ -722,9 +725,9 @@ func (t *Topic) Open() error {
s := segments.Last()

// Read the last segment and extract the last message index.
index, err := ReadSegmentMaxIndex(s.Path)
index, err := RecoverSegment(s.Path)
if err != nil {
return fmt.Errorf("read segment max index: %s", err)
return fmt.Errorf("recover segment: %s", err)
}
t.index = index

Expand Down Expand Up @@ -764,28 +767,6 @@ func (t *Topic) close() error {
return nil
}

// ReadIndex reads the highest available index for a topic from disk.
func (t *Topic) ReadIndex() (uint64, error) {
// Read a list of all segments.
segments, err := ReadSegments(t.path)
if err != nil && !os.IsNotExist(err) {
return 0, fmt.Errorf("read segments: %s", err)
}

// Ignore if there are no available segments.
if len(segments) == 0 {
return 0, nil
}

// Read highest index on the last segment.
index, err := ReadSegmentMaxIndex(segments.Last().Path)
if err != nil {
return 0, fmt.Errorf("read segment max index: %s", err)
}

return index, nil
}

// WriteMessage writes a message to the end of the topic.
func (t *Topic) WriteMessage(m *Message) error {
t.mu.Lock()
Expand Down Expand Up @@ -816,9 +797,9 @@ func (t *Topic) WriteMessage(m *Message) error {
}

// Encode message.
b := make([]byte, messageHeaderSize+len(m.Data))
b := make([]byte, MessageHeaderSize+len(m.Data))
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
copy(b[MessageHeaderSize:], m.Data)

// Write to last segment.
if _, err := t.file.Write(b); err != nil {
Expand Down Expand Up @@ -966,8 +947,9 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) {
return segments[len(segments)-1], nil
}

// ReadSegmentMaxIndex returns the highest index recorded in a segment.
func ReadSegmentMaxIndex(path string) (uint64, error) {
// RecoverSegment parses the entire segment and truncates at any partial messages.
// Returns the last index seen in the segment.
func RecoverSegment(path string) (uint64, error) {
// Open segment file.
f, err := os.Open(path)
if os.IsNotExist(err) {
Expand All @@ -983,6 +965,16 @@ func ReadSegmentMaxIndex(path string) (uint64, error) {
for {
var m Message
if err := dec.Decode(&m); err == io.EOF {
return index, nil
} else if err == io.ErrUnexpectedEOF {
// The decoder will unread any partially read data so we can
// simply truncate at current position.
if n, err := f.Seek(0, os.SEEK_CUR); err != nil {
return 0, fmt.Errorf("seek: %s", err)
} else if err := os.Truncate(path, n); err != nil {
return 0, fmt.Errorf("truncate: n=%d, err=%s", n-1, err)
}

return index, nil
} else if err != nil {
return 0, fmt.Errorf("decode: %s", err)
Expand Down Expand Up @@ -1017,6 +1009,19 @@ func NewTopicReader(path string, index uint64, streaming bool) *TopicReader {
}
}

// Seek seeks to a position the current segment.
func (r *TopicReader) Seek(offset int64, whence int) (int64, error) {
assert(whence == os.SEEK_CUR, "topic reader can only seek to a relative position")

r.mu.Lock()
defer r.mu.Unlock()

if r.file == nil {
return 0, nil
}
return r.file.Seek(offset, whence)
}

// Read reads the next bytes from the reader into the buffer.
func (r *TopicReader) Read(p []byte) (int, error) {
for {
Expand Down Expand Up @@ -1109,7 +1114,7 @@ func (r *TopicReader) seekAfterIndex(f *os.File, seek uint64) error {
return err
} else if m.Index >= seek {
// Seek to message start.
if _, err := f.Seek(-int64(messageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil {
if _, err := f.Seek(-int64(MessageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil {
return fmt.Errorf("seek: %s", err)
}
return nil
Expand Down Expand Up @@ -1203,7 +1208,7 @@ const (
)

// The size of the encoded message header, in bytes.
const messageHeaderSize = 2 + 8 + 8 + 4
const MessageHeaderSize = 2 + 8 + 8 + 4

// Message represents a single item in a topic.
type Message struct {
Expand All @@ -1219,34 +1224,34 @@ func (m *Message) WriteTo(w io.Writer) (n int64, err error) {
return int64(n), err
}
if n, err := w.Write(m.Data); err != nil {
return int64(messageHeaderSize + n), err
return int64(MessageHeaderSize + n), err
}
return int64(messageHeaderSize + len(m.Data)), nil
return int64(MessageHeaderSize + len(m.Data)), nil
}

// MarshalBinary returns a binary representation of the message.
// This implements encoding.BinaryMarshaler. An error cannot be returned.
func (m *Message) MarshalBinary() ([]byte, error) {
b := make([]byte, messageHeaderSize+len(m.Data))
b := make([]byte, MessageHeaderSize+len(m.Data))
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
copy(b[MessageHeaderSize:], m.Data)
return b, nil
}

// UnmarshalBinary reads a message from a binary encoded slice.
// This implements encoding.BinaryUnmarshaler.
func (m *Message) UnmarshalBinary(b []byte) error {
m.unmarshalHeader(b)
if len(b[messageHeaderSize:]) < len(m.Data) {
return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data))
if len(b[MessageHeaderSize:]) < len(m.Data) {
return fmt.Errorf("message data too short: %d < %d", len(b[MessageHeaderSize:]), len(m.Data))
}
copy(m.Data, b[messageHeaderSize:])
copy(m.Data, b[MessageHeaderSize:])
return nil
}

// marshalHeader returns a byte slice with the message header.
func (m *Message) marshalHeader() []byte {
b := make([]byte, messageHeaderSize)
b := make([]byte, MessageHeaderSize)
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
binary.BigEndian.PutUint64(b[2:10], m.TopicID)
binary.BigEndian.PutUint64(b[10:18], m.Index)
Expand All @@ -1265,28 +1270,41 @@ func (m *Message) unmarshalHeader(b []byte) {

// MessageDecoder decodes messages from a reader.
type MessageDecoder struct {
r io.Reader
r io.ReadSeeker
}

// NewMessageDecoder returns a new instance of the MessageDecoder.
func NewMessageDecoder(r io.Reader) *MessageDecoder {
func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder {
return &MessageDecoder{r: r}
}

// Decode reads a message from the decoder's reader.
func (dec *MessageDecoder) Decode(m *Message) error {
// Read header bytes.
var b [messageHeaderSize]byte
if _, err := io.ReadFull(dec.r, b[:]); err == io.EOF {
// Unread if there is a partial read.
var b [MessageHeaderSize]byte
if n, err := io.ReadFull(dec.r, b[:]); err == io.EOF {
return err
} else if err == io.ErrUnexpectedEOF {
if _, err := dec.r.Seek(-int64(n), os.SEEK_CUR); err != nil {
return fmt.Errorf("cannot unread header: n=%d, err=%s", n, err)
}
warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b), n, err)
return err
} else if err != nil {
return fmt.Errorf("read header: %s", err)
}
m.unmarshalHeader(b[:])

// Read data.
if _, err := io.ReadFull(dec.r, m.Data); err != nil {
return fmt.Errorf("read body: %s", err)
if n, err := io.ReadFull(dec.r, m.Data); err == io.EOF || err == io.ErrUnexpectedEOF {
if _, err := dec.r.Seek(-int64(MessageHeaderSize+n), os.SEEK_CUR); err != nil {
return fmt.Errorf("cannot unread header+data: n=%d, err=%s", n, err)
}
warnf("unexpected eof(1): len=%d, n=%d, err=%s", len(m.Data), n, err)
return io.ErrUnexpectedEOF
} else if err != nil {
return fmt.Errorf("read data: %s", err)
}

return nil
Expand Down
102 changes: 90 additions & 12 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,77 @@ func (b *RaftFSMBroker) Index() uint64 { return 0 }
func (b *RaftFSMBroker) WriteTo(w io.Writer) (n int64, err error) { return 0, nil }
func (b *RaftFSMBroker) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil }

// Ensure a topic can recover if it has a partial message.
func TestTopic_Recover(t *testing.T) {
topic := OpenTopic()
defer topic.Close()

// Write a messages.
if err := topic.WriteMessage(&messaging.Message{Index: 1, Data: make([]byte, 10)}); err != nil {
t.Fatal(err)
} else if err = topic.WriteMessage(&messaging.Message{Index: 2, Data: make([]byte, 10)}); err != nil {
t.Fatal(err)
} else if err = topic.WriteMessage(&messaging.Message{Index: 3, Data: make([]byte, 10)}); err != nil {
t.Fatal(err)
}

// Close topic and trim the file by a few bytes.
topic.Topic.Close()
if fi, err := os.Stat(filepath.Join(topic.Path(), "1")); err != nil {
t.Fatal(err)
} else if err = os.Truncate(filepath.Join(topic.Path(), "1"), fi.Size()-5); err != nil {
t.Fatal(err)
}

// Reopen topic.
if err := topic.Open(); err != nil {
t.Fatal(err)
}

// Rewrite the third message with a different data size.
if err := topic.WriteMessage(&messaging.Message{Index: 3, Data: make([]byte, 20)}); err != nil {
t.Fatal(err)
}

// Read all messages.
a := MustDecodeAllMessages(messaging.NewTopicReader(topic.Path(), 0, false))
if len(a) != 3 {
t.Fatalf("unexpected message count: %d", len(a))
} else if !reflect.DeepEqual(a[0], &messaging.Message{Index: 1, Data: make([]byte, 10)}) {
t.Fatalf("unexpected message(0): %#v", a[0])
} else if !reflect.DeepEqual(a[1], &messaging.Message{Index: 2, Data: make([]byte, 10)}) {
t.Fatalf("unexpected message(1): %#v", a[1])
} else if !reflect.DeepEqual(a[2], &messaging.Message{Index: 3, Data: make([]byte, 20)}) {
t.Fatalf("unexpected message(2): %#v", a[2])
}

}

// Topic is a wrapper for messaging.Topic that creates the topic in a temporary location.
type Topic struct {
*messaging.Topic
}

// NewTopic returns a new Topic instance.
func NewTopic() *Topic {
return &Topic{messaging.NewTopic(1, tempfile())}
}

// OpenTopic returns a new, open Topic instance.
func OpenTopic() *Topic {
t := NewTopic()
if err := t.Open(); err != nil {
panic("open: " + err.Error())
}
return t
}

// Close closes and deletes the temporary topic.
func (t *Topic) Close() {
defer os.RemoveAll(t.Path())
t.Topic.Close()
}

// Ensure a list of topics can be read from a directory.
func TestReadTopics(t *testing.T) {
path, _ := ioutil.TempDir("", "")
Expand Down Expand Up @@ -700,20 +771,10 @@ func (b *Broker) Log() *BrokerLog {
}

// MustReadAllTopic reads all messages on a topic. Panic on error.
func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) {
func (b *Broker) MustReadAllTopic(topicID uint64) []*messaging.Message {
r := b.TopicReader(topicID, 0, false)
defer r.Close()

dec := messaging.NewMessageDecoder(r)
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
return
} else if err != nil {
panic("read all topic: " + err.Error())
}
a = append(a, m)
}
return MustDecodeAllMessages(r)
}

// BrokerLog is a mockable object that implements Broker.Log.
Expand Down Expand Up @@ -772,6 +833,23 @@ func MustWriteFile(filename string, data []byte) {
}
}

// MustDecodeAllMessages reads all messages on a reader.
func MustDecodeAllMessages(r interface {
io.ReadCloser
io.Seeker
}) (a []*messaging.Message) {
dec := messaging.NewMessageDecoder(r)
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
return
} else if err != nil {
panic("read all: " + err.Error())
}
a = append(a, m)
}
}

// MustMarshalMessages marshals a slice of messages to bytes. Panic on error.
func MustMarshalMessages(a []*messaging.Message) []byte {
var buf bytes.Buffer
Expand Down
8 changes: 7 additions & 1 deletion messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
c.Logger.Printf("connected to broker: %s", req.URL.String())

// Continuously decode messages from request body in a separate goroutine.
dec := NewMessageDecoder(resp.Body)
dec := NewMessageDecoder(&nopSeeker{resp.Body})
for {
// Decode message from the stream.
m := &Message{}
Expand Down Expand Up @@ -703,3 +703,9 @@ func urlsEqual(a, b []url.URL) bool {
}
return true
}

type nopSeeker struct {
io.Reader
}

func (*nopSeeker) Seek(offset int64, whence int) (int64, error) { return 0, nil }
5 changes: 4 additions & 1 deletion messaging/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ type Handler struct {
URLs() []url.URL
IsLeader() bool
LeaderURL() url.URL
TopicReader(topicID, index uint64, streaming bool) io.ReadCloser
TopicReader(topicID, index uint64, streaming bool) interface {
io.ReadCloser
io.Seeker
}
Publish(m *Message) (uint64, error)
SetTopicMaxIndex(topicID, index uint64, u url.URL) error
}
Expand Down
Loading

0 comments on commit ba3026f

Please sign in to comment.