Skip to content

Commit

Permalink
[FIXED] FileStore: index file may be recreated with wrong offsets
Browse files Browse the repository at this point in the history
This could happen if recovery of an index does not have matching
information with the last message in corresponding ".dat" file.
In that case, the server wipes the index file and recreate it
based on the data file. However, the offset was not reset, which
means that the index file would contain wrong offsets.

Resolves #1086

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 14, 2020
1 parent 4208132 commit 20db05d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
3 changes: 3 additions & 0 deletions stores/filestore.go
Expand Up @@ -2378,6 +2378,9 @@ func (ms *FileMsgStore) recoverOneMsgFile(fslice *fileSlice, fseq int, useIdxFil
// We are going to write the index file while recovering the data file
bw := bufio.NewWriterSize(fslice.idxFile.handle, msgIndexRecSize*1000)

// Reset offset in case we come from a mismatch between .dat and .idx files.
offset = int64(4)

for {
ms.tmpMsgBuf, msgSize, _, err = readRecord(br, ms.tmpMsgBuf, false, crcTable, doCRC)
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions stores/filestore_msg_test.go
Expand Up @@ -1745,6 +1745,53 @@ func TestFSRecoverEmptyIndexMsgFile(t *testing.T) {
}
}

func TestFSEnsureLastMsgAndIndexMatch(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)

s := createDefaultFileStore(t)
defer s.Close()

c := storeCreateChannel(t, s, "foo")
storeMsg(t, c, "foo", 1, []byte("msg1"))
storeMsg(t, c, "foo", 2, []byte("msg2"))
ms := c.Msgs.(*FileMsgStore)
ms.RLock()
fname := ms.files[1].file.name
offset := ms.wOffset
ms.RUnlock()
storeMsg(t, c, "foo", 3, []byte("msg3"))

s.Close()

f, err := os.OpenFile(fname, os.O_RDWR, 0666)
if err != nil {
t.Fatalf("Error opening file: %v", err)
}
defer f.Close()
if err := f.Truncate(offset); err != nil {
t.Fatalf("Error on truncate: %v", err)
}
f.Close()
f, err = os.OpenFile(fname, os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
t.Fatalf("Error opening file: %v", err)
}
defer f.Close()
m3 := &pb.MsgProto{Sequence: 3, Subject: "foo", Data: []byte("msg3_modified")}
if _, _, err := writeRecord(f, nil, recNoType, m3, m3.Size(), crc32.IEEETable); err != nil {
t.Fatalf("Error rewriting file: %v", err)
}
f.Close()

s, rs := openDefaultFileStore(t)
defer s.Close()
c = getRecoveredChannel(t, rs, "foo")
if _, err := c.Msgs.Lookup(3); err != nil {
t.Fatalf("Error on lookup: %v", err)
}
}

func TestFSEmptyRemovesAllMsgsFiles(t *testing.T) {
cleanupFSDatastore(t)
defer cleanupFSDatastore(t)
Expand Down

0 comments on commit 20db05d

Please sign in to comment.