Skip to content

Commit

Permalink
segment: do not consider offset 0 as an invalid read value
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien BONACHERA committed Jul 9, 2018
1 parent 3497f59 commit 66591a8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
7 changes: 3 additions & 4 deletions commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,13 @@ func (s *Segment) findEntry(offset int64) (e *Entry, err error) {
e = &Entry{}
n := int(s.Index.bytes / entryWidth)
idx := sort.Search(n, func(i int) bool {
_ = s.Index.ReadEntryAtFileOffset(e, int64(i*entryWidth))
return e.Offset >= offset || e.Offset == 0
err = s.Index.ReadEntryAtFileOffset(e, int64(i*entryWidth))
return e.Offset >= offset || err != nil
})
if idx == n {
return nil, errors.New("entry not found")
}
_ = s.Index.ReadEntryAtFileOffset(e, int64(idx*entryWidth))
return e, nil
return e, s.Index.ReadEntryAtFileOffset(e, int64(idx*entryWidth))
}

// Delete closes the segment and then deletes its log and index files.
Expand Down
39 changes: 39 additions & 0 deletions commitlog/segment_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package commitlog_test

import (
"io"
"log"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -29,3 +31,40 @@ func TestSegmentScanner(t *testing.T) {
require.NoError(t, err)
require.Equal(t, msgSets[0], ms)
}
func TestSegmentReader(t *testing.T) {
var err error

l := setupWithOptions(t, commitlog.Options{
MaxSegmentBytes: 1024,
MaxLogBytes: 1024,
})
defer cleanup(t, l)

message := []byte("one")
nb := 12
for i := 0; i < nb; i++ {
_, err = l.Append(commitlog.NewMessageSet(uint64(i), message))
require.NoError(t, err)
}
t.Run("must read 1 message from offset 11", mustRead(l, 11, 1, message))
}

func mustRead(l *commitlog.CommitLog, offset int64, messagesNumber int, message []byte) func(t *testing.T) {
return func(t *testing.T) {
r, err := l.NewReader(offset, 0)
require.NoError(t, err)
buf := make([]byte, len(message)+12)

for messagesNumber > 0 {
n, err := r.Read(buf)
require.NoError(t, err)
require.Equal(t, message, buf[12:])
require.Equal(t, 15, n)
messagesNumber--
}
n, err := r.Read(buf)
log.Println(string(buf[12:]))
require.Equal(t, 0, n)
require.Equal(t, io.EOF, err)
}
}

0 comments on commit 66591a8

Please sign in to comment.