Skip to content

Commit

Permalink
[FIXED] Make sure to hold lock for duration of truncate (#5279)
Browse files Browse the repository at this point in the history
Make sure to hold lock for duration of truncate, was calling slotInfo
unprotected.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 4, 2024
2 parents 4d9732c + bcdbb2e commit fcff483
Showing 1 changed file with 5 additions and 14 deletions.
19 changes: 5 additions & 14 deletions server/filestore.go
Expand Up @@ -4313,8 +4313,11 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {

// Truncate this message block to the storedMsg.
func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()

// Make sure we are loaded to process messages etc.
if err := mb.loadMsgs(); err != nil {
if err := mb.loadMsgsWithLock(); err != nil {
return 0, 0, err
}

Expand All @@ -4328,8 +4331,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {

var purged, bytes uint64

mb.mu.Lock()

checkDmap := mb.dmap.Size() > 0
var smv StoreMsg

Expand Down Expand Up @@ -4365,28 +4366,24 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if mb.cmp != NoCompression {
buf, err := mb.loadBlock(nil)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to load block from disk: %w", err)
}
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
mb.mu.Unlock()
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
buf, err = mb.decompressIfNeeded(buf)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to decompress block: %w", err)
}
buf = buf[:eof]
copy(mb.lchk[0:], buf[:len(buf)-checksumSize])
buf, err = mb.cmp.Compress(buf)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to recompress block: %w", err)
}
meta := &CompressionInfo{
Expand All @@ -4397,19 +4394,16 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
mb.mu.Unlock()
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
n, err := mb.writeAt(buf, 0)
if err != nil {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err)
}
if n != len(buf) {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf))
}
mb.mfd.Truncate(int64(len(buf)))
Expand All @@ -4422,7 +4416,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
mb.mfd.ReadAt(lchk[:], eof-8)
copy(mb.lchk[0:], lchk[:])
} else {
mb.mu.Unlock()
return 0, 0, fmt.Errorf("failed to truncate msg block %d, file not open", mb.index)
}

Expand All @@ -4436,10 +4429,8 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
// Redo per subject info for this block.
mb.resetPerSubjectInfo()

mb.mu.Unlock()

// Load msgs again.
mb.loadMsgs()
mb.loadMsgsWithLock()

return purged, bytes, nil
}
Expand Down

0 comments on commit fcff483

Please sign in to comment.