From ba3026f400c1419fe21c7d6ceb9c853bcf8ecb6a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 3 Apr 2015 17:11:54 -0600 Subject: [PATCH] Broker log recovery. This pull request adds recovery to the messaging.Topic when opening. If any partial messages are found then the file is truncated at that point and started from there. This can occur when ungracefully shutting down a server. It can leave half written messages at the end of segments. --- CHANGELOG.md | 1 + messaging/broker.go | 108 ++++++++++++++++++++++---------------- messaging/broker_test.go | 102 ++++++++++++++++++++++++++++++----- messaging/client.go | 8 ++- messaging/handler.go | 5 +- messaging/handler_test.go | 40 +++++++++----- 6 files changed, 193 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 248d32f80b7..e212e724f71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#2156](https://github.com/influxdb/influxdb/pull/2156): Propagate error when resolving UDP address in Graphite UDP server. - [#2163](https://github.com/influxdb/influxdb/pull/2163): Fix up paths for default data and run storage. - [#2164](https://github.com/influxdb/influxdb/pull/2164): Append STDOUT/STDERR in initscript. +- [#2167](https://github.com/influxdb/influxdb/pull/2167): Add broker log recovery. ## v0.9.0-rc19 [2015-04-01] diff --git a/messaging/broker.go b/messaging/broker.go index 7549fe7f458..cc72185f30d 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -468,7 +468,10 @@ func (b *Broker) Publish(m *Message) (uint64, error) { } // TopicReader returns a new topic reader for a topic starting from a given index. -func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { +func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface { + io.ReadCloser + io.Seeker +} { return NewTopicReader(b.TopicPath(topicID), index, streaming) } @@ -722,9 +725,9 @@ func (t *Topic) Open() error { s := segments.Last() // Read the last segment and extract the last message index. - index, err := ReadSegmentMaxIndex(s.Path) + index, err := RecoverSegment(s.Path) if err != nil { - return fmt.Errorf("read segment max index: %s", err) + return fmt.Errorf("recover segment: %s", err) } t.index = index @@ -764,28 +767,6 @@ func (t *Topic) close() error { return nil } -// ReadIndex reads the highest available index for a topic from disk. -func (t *Topic) ReadIndex() (uint64, error) { - // Read a list of all segments. - segments, err := ReadSegments(t.path) - if err != nil && !os.IsNotExist(err) { - return 0, fmt.Errorf("read segments: %s", err) - } - - // Ignore if there are no available segments. - if len(segments) == 0 { - return 0, nil - } - - // Read highest index on the last segment. - index, err := ReadSegmentMaxIndex(segments.Last().Path) - if err != nil { - return 0, fmt.Errorf("read segment max index: %s", err) - } - - return index, nil -} - // WriteMessage writes a message to the end of the topic. func (t *Topic) WriteMessage(m *Message) error { t.mu.Lock() @@ -816,9 +797,9 @@ func (t *Topic) WriteMessage(m *Message) error { } // Encode message. - b := make([]byte, messageHeaderSize+len(m.Data)) + b := make([]byte, MessageHeaderSize+len(m.Data)) copy(b, m.marshalHeader()) - copy(b[messageHeaderSize:], m.Data) + copy(b[MessageHeaderSize:], m.Data) // Write to last segment. if _, err := t.file.Write(b); err != nil { @@ -966,8 +947,9 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { return segments[len(segments)-1], nil } -// ReadSegmentMaxIndex returns the highest index recorded in a segment. -func ReadSegmentMaxIndex(path string) (uint64, error) { +// RecoverSegment parses the entire segment and truncates at any partial messages. +// Returns the last index seen in the segment. +func RecoverSegment(path string) (uint64, error) { // Open segment file. f, err := os.Open(path) if os.IsNotExist(err) { @@ -983,6 +965,16 @@ func ReadSegmentMaxIndex(path string) (uint64, error) { for { var m Message if err := dec.Decode(&m); err == io.EOF { + return index, nil + } else if err == io.ErrUnexpectedEOF { + // The decoder will unread any partially read data so we can + // simply truncate at current position. + if n, err := f.Seek(0, os.SEEK_CUR); err != nil { + return 0, fmt.Errorf("seek: %s", err) + } else if err := os.Truncate(path, n); err != nil { + return 0, fmt.Errorf("truncate: n=%d, err=%s", n-1, err) + } + return index, nil } else if err != nil { return 0, fmt.Errorf("decode: %s", err) @@ -1017,6 +1009,19 @@ func NewTopicReader(path string, index uint64, streaming bool) *TopicReader { } } +// Seek seeks to a position the current segment. +func (r *TopicReader) Seek(offset int64, whence int) (int64, error) { + assert(whence == os.SEEK_CUR, "topic reader can only seek to a relative position") + + r.mu.Lock() + defer r.mu.Unlock() + + if r.file == nil { + return 0, nil + } + return r.file.Seek(offset, whence) +} + // Read reads the next bytes from the reader into the buffer. func (r *TopicReader) Read(p []byte) (int, error) { for { @@ -1109,7 +1114,7 @@ func (r *TopicReader) seekAfterIndex(f *os.File, seek uint64) error { return err } else if m.Index >= seek { // Seek to message start. - if _, err := f.Seek(-int64(messageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil { + if _, err := f.Seek(-int64(MessageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil { return fmt.Errorf("seek: %s", err) } return nil @@ -1203,7 +1208,7 @@ const ( ) // The size of the encoded message header, in bytes. -const messageHeaderSize = 2 + 8 + 8 + 4 +const MessageHeaderSize = 2 + 8 + 8 + 4 // Message represents a single item in a topic. type Message struct { @@ -1219,17 +1224,17 @@ func (m *Message) WriteTo(w io.Writer) (n int64, err error) { return int64(n), err } if n, err := w.Write(m.Data); err != nil { - return int64(messageHeaderSize + n), err + return int64(MessageHeaderSize + n), err } - return int64(messageHeaderSize + len(m.Data)), nil + return int64(MessageHeaderSize + len(m.Data)), nil } // MarshalBinary returns a binary representation of the message. // This implements encoding.BinaryMarshaler. An error cannot be returned. func (m *Message) MarshalBinary() ([]byte, error) { - b := make([]byte, messageHeaderSize+len(m.Data)) + b := make([]byte, MessageHeaderSize+len(m.Data)) copy(b, m.marshalHeader()) - copy(b[messageHeaderSize:], m.Data) + copy(b[MessageHeaderSize:], m.Data) return b, nil } @@ -1237,16 +1242,16 @@ func (m *Message) MarshalBinary() ([]byte, error) { // This implements encoding.BinaryUnmarshaler. func (m *Message) UnmarshalBinary(b []byte) error { m.unmarshalHeader(b) - if len(b[messageHeaderSize:]) < len(m.Data) { - return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data)) + if len(b[MessageHeaderSize:]) < len(m.Data) { + return fmt.Errorf("message data too short: %d < %d", len(b[MessageHeaderSize:]), len(m.Data)) } - copy(m.Data, b[messageHeaderSize:]) + copy(m.Data, b[MessageHeaderSize:]) return nil } // marshalHeader returns a byte slice with the message header. func (m *Message) marshalHeader() []byte { - b := make([]byte, messageHeaderSize) + b := make([]byte, MessageHeaderSize) binary.BigEndian.PutUint16(b[0:2], uint16(m.Type)) binary.BigEndian.PutUint64(b[2:10], m.TopicID) binary.BigEndian.PutUint64(b[10:18], m.Index) @@ -1265,19 +1270,26 @@ func (m *Message) unmarshalHeader(b []byte) { // MessageDecoder decodes messages from a reader. type MessageDecoder struct { - r io.Reader + r io.ReadSeeker } // NewMessageDecoder returns a new instance of the MessageDecoder. -func NewMessageDecoder(r io.Reader) *MessageDecoder { +func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder { return &MessageDecoder{r: r} } // Decode reads a message from the decoder's reader. func (dec *MessageDecoder) Decode(m *Message) error { // Read header bytes. - var b [messageHeaderSize]byte - if _, err := io.ReadFull(dec.r, b[:]); err == io.EOF { + // Unread if there is a partial read. + var b [MessageHeaderSize]byte + if n, err := io.ReadFull(dec.r, b[:]); err == io.EOF { + return err + } else if err == io.ErrUnexpectedEOF { + if _, err := dec.r.Seek(-int64(n), os.SEEK_CUR); err != nil { + return fmt.Errorf("cannot unread header: n=%d, err=%s", n, err) + } + warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b), n, err) return err } else if err != nil { return fmt.Errorf("read header: %s", err) @@ -1285,8 +1297,14 @@ func (dec *MessageDecoder) Decode(m *Message) error { m.unmarshalHeader(b[:]) // Read data. - if _, err := io.ReadFull(dec.r, m.Data); err != nil { - return fmt.Errorf("read body: %s", err) + if n, err := io.ReadFull(dec.r, m.Data); err == io.EOF || err == io.ErrUnexpectedEOF { + if _, err := dec.r.Seek(-int64(MessageHeaderSize+n), os.SEEK_CUR); err != nil { + return fmt.Errorf("cannot unread header+data: n=%d, err=%s", n, err) + } + warnf("unexpected eof(1): len=%d, n=%d, err=%s", len(m.Data), n, err) + return io.ErrUnexpectedEOF + } else if err != nil { + return fmt.Errorf("read data: %s", err) } return nil diff --git a/messaging/broker_test.go b/messaging/broker_test.go index 6fae315c996..28b64730d2c 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -336,6 +336,77 @@ func (b *RaftFSMBroker) Index() uint64 { return 0 } func (b *RaftFSMBroker) WriteTo(w io.Writer) (n int64, err error) { return 0, nil } func (b *RaftFSMBroker) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil } +// Ensure a topic can recover if it has a partial message. +func TestTopic_Recover(t *testing.T) { + topic := OpenTopic() + defer topic.Close() + + // Write a messages. + if err := topic.WriteMessage(&messaging.Message{Index: 1, Data: make([]byte, 10)}); err != nil { + t.Fatal(err) + } else if err = topic.WriteMessage(&messaging.Message{Index: 2, Data: make([]byte, 10)}); err != nil { + t.Fatal(err) + } else if err = topic.WriteMessage(&messaging.Message{Index: 3, Data: make([]byte, 10)}); err != nil { + t.Fatal(err) + } + + // Close topic and trim the file by a few bytes. + topic.Topic.Close() + if fi, err := os.Stat(filepath.Join(topic.Path(), "1")); err != nil { + t.Fatal(err) + } else if err = os.Truncate(filepath.Join(topic.Path(), "1"), fi.Size()-5); err != nil { + t.Fatal(err) + } + + // Reopen topic. + if err := topic.Open(); err != nil { + t.Fatal(err) + } + + // Rewrite the third message with a different data size. + if err := topic.WriteMessage(&messaging.Message{Index: 3, Data: make([]byte, 20)}); err != nil { + t.Fatal(err) + } + + // Read all messages. + a := MustDecodeAllMessages(messaging.NewTopicReader(topic.Path(), 0, false)) + if len(a) != 3 { + t.Fatalf("unexpected message count: %d", len(a)) + } else if !reflect.DeepEqual(a[0], &messaging.Message{Index: 1, Data: make([]byte, 10)}) { + t.Fatalf("unexpected message(0): %#v", a[0]) + } else if !reflect.DeepEqual(a[1], &messaging.Message{Index: 2, Data: make([]byte, 10)}) { + t.Fatalf("unexpected message(1): %#v", a[1]) + } else if !reflect.DeepEqual(a[2], &messaging.Message{Index: 3, Data: make([]byte, 20)}) { + t.Fatalf("unexpected message(2): %#v", a[2]) + } + +} + +// Topic is a wrapper for messaging.Topic that creates the topic in a temporary location. +type Topic struct { + *messaging.Topic +} + +// NewTopic returns a new Topic instance. +func NewTopic() *Topic { + return &Topic{messaging.NewTopic(1, tempfile())} +} + +// OpenTopic returns a new, open Topic instance. +func OpenTopic() *Topic { + t := NewTopic() + if err := t.Open(); err != nil { + panic("open: " + err.Error()) + } + return t +} + +// Close closes and deletes the temporary topic. +func (t *Topic) Close() { + defer os.RemoveAll(t.Path()) + t.Topic.Close() +} + // Ensure a list of topics can be read from a directory. func TestReadTopics(t *testing.T) { path, _ := ioutil.TempDir("", "") @@ -700,20 +771,10 @@ func (b *Broker) Log() *BrokerLog { } // MustReadAllTopic reads all messages on a topic. Panic on error. -func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) { +func (b *Broker) MustReadAllTopic(topicID uint64) []*messaging.Message { r := b.TopicReader(topicID, 0, false) defer r.Close() - - dec := messaging.NewMessageDecoder(r) - for { - m := &messaging.Message{} - if err := dec.Decode(m); err == io.EOF { - return - } else if err != nil { - panic("read all topic: " + err.Error()) - } - a = append(a, m) - } + return MustDecodeAllMessages(r) } // BrokerLog is a mockable object that implements Broker.Log. @@ -772,6 +833,23 @@ func MustWriteFile(filename string, data []byte) { } } +// MustDecodeAllMessages reads all messages on a reader. +func MustDecodeAllMessages(r interface { + io.ReadCloser + io.Seeker +}) (a []*messaging.Message) { + dec := messaging.NewMessageDecoder(r) + for { + m := &messaging.Message{} + if err := dec.Decode(m); err == io.EOF { + return + } else if err != nil { + panic("read all: " + err.Error()) + } + a = append(a, m) + } +} + // MustMarshalMessages marshals a slice of messages to bytes. Panic on error. func MustMarshalMessages(a []*messaging.Message) []byte { var buf bytes.Buffer diff --git a/messaging/client.go b/messaging/client.go index b1088ae89a0..a717114f8ec 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -665,7 +665,7 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error { c.Logger.Printf("connected to broker: %s", req.URL.String()) // Continuously decode messages from request body in a separate goroutine. - dec := NewMessageDecoder(resp.Body) + dec := NewMessageDecoder(&nopSeeker{resp.Body}) for { // Decode message from the stream. m := &Message{} @@ -703,3 +703,9 @@ func urlsEqual(a, b []url.URL) bool { } return true } + +type nopSeeker struct { + io.Reader +} + +func (*nopSeeker) Seek(offset int64, whence int) (int64, error) { return 0, nil } diff --git a/messaging/handler.go b/messaging/handler.go index 76e132115e6..c5c2390a81f 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -19,7 +19,10 @@ type Handler struct { URLs() []url.URL IsLeader() bool LeaderURL() url.URL - TopicReader(topicID, index uint64, streaming bool) io.ReadCloser + TopicReader(topicID, index uint64, streaming bool) interface { + io.ReadCloser + io.Seeker + } Publish(m *Message) (uint64, error) SetTopicMaxIndex(topicID, index uint64, u url.URL) error } diff --git a/messaging/handler_test.go b/messaging/handler_test.go index 74b60f94db1..03d6259812f 100644 --- a/messaging/handler_test.go +++ b/messaging/handler_test.go @@ -17,7 +17,10 @@ import ( // Ensure a topic can be streamed from an index. func TestHandler_getMessages(t *testing.T) { var hb HandlerBroker - hb.TopicReaderFunc = func(topicID, index uint64, streaming bool) io.ReadCloser { + hb.TopicReaderFunc = func(topicID, index uint64, streaming bool) interface { + io.ReadCloser + io.Seeker + } { if topicID != 2000 { t.Fatalf("unexpected topic id: %d", topicID) } else if index != 10 { @@ -30,7 +33,7 @@ func TestHandler_getMessages(t *testing.T) { var buf bytes.Buffer (&messaging.Message{Index: 10, Data: []byte{0, 0, 0, 0}}).WriteTo(&buf) - return &bytesBufferCloser{buf} + return &nopSeekCloser{&buf} } s := httptest.NewServer(&messaging.Handler{Broker: &hb}) defer s.Close() @@ -46,7 +49,7 @@ func TestHandler_getMessages(t *testing.T) { // Decode from body. var m messaging.Message - if err := messaging.NewMessageDecoder(resp.Body).Decode(&m); err != nil { + if err := messaging.NewMessageDecoder(&nopSeeker{resp.Body}).Decode(&m); err != nil { t.Fatalf("message decode error: %s", err) } else if !reflect.DeepEqual(&m, &messaging.Message{Index: 10, Data: []byte{0, 0, 0, 0}}) { t.Fatalf("unexpected message: %#v", &m) @@ -269,11 +272,14 @@ func TestHandler_ErrNotFound(t *testing.T) { // HandlerBroker is a mockable type that implements Handler.Broker. type HandlerBroker struct { - URLsFunc func() []url.URL - IsLeaderFunc func() bool - LeaderURLFunc func() url.URL - PublishFunc func(m *messaging.Message) (uint64, error) - TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser + URLsFunc func() []url.URL + IsLeaderFunc func() bool + LeaderURLFunc func() url.URL + PublishFunc func(m *messaging.Message) (uint64, error) + TopicReaderFunc func(topicID, index uint64, streaming bool) interface { + io.ReadCloser + io.Seeker + } SetTopicMaxIndexFunc func(topicID, index uint64, dataURL url.URL) error } @@ -281,7 +287,10 @@ func (b *HandlerBroker) URLs() []url.URL { return b func (b *HandlerBroker) IsLeader() bool { return b.IsLeaderFunc() } func (b *HandlerBroker) LeaderURL() url.URL { return b.LeaderURLFunc() } func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b.PublishFunc(m) } -func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { +func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) interface { + io.ReadCloser + io.Seeker +} { return b.TopicReaderFunc(topicID, index, streaming) } func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64, dataURL url.URL) error { @@ -297,8 +306,15 @@ func MustParseURL(s string) *url.URL { return u } -type bytesBufferCloser struct { - bytes.Buffer +type nopSeeker struct { + io.Reader +} + +func (*nopSeeker) Seek(offset int64, whence int) (int64, error) { return 0, nil } + +type nopSeekCloser struct { + io.Reader } -func (*bytesBufferCloser) Close() error { return nil } +func (*nopSeekCloser) Seek(offset int64, whence int) (int64, error) { return 0, nil } +func (*nopSeekCloser) Close() error { return nil }