Skip to content

Commit

Permalink
Check for unclean (not dirty!!) buffers during flush
Browse files Browse the repository at this point in the history
Ignoring BUF_FLUSHED_* buffers could previous lead to multipart files with
all flushed parts being incorrectly marked as "clean", and multipart upload
wasn't completed in this case...
  • Loading branch information
vitalif committed May 21, 2024
1 parent db20530 commit fc12caf
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
22 changes: 14 additions & 8 deletions internal/buffer_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type BufferList struct {
dirtyQueue btree.Map[uint64, *dirtyPart]
dirtyParts map[uint64]*dirtyPart
dirtyQid uint64
// dirty buffer count
dirtyCount int64
// unclean (anything except BUF_CLEAN) buffer count
uncleanCount int64
// next dirty index for new buffers
curDirtyID uint64
}
Expand Down Expand Up @@ -254,8 +254,10 @@ func (l *BufferList) nextID() uint64 {
}

func (l *BufferList) unqueue(b *FileBuffer) {
if b.state != BUF_CLEAN {
l.uncleanCount--
}
if b.state == BUF_DIRTY {
l.dirtyCount--
sp := l.helpers.PartNum(b.offset)
ep := l.helpers.PartNum(b.offset+b.length-1)
for i := sp; i < ep+1; i++ {
Expand Down Expand Up @@ -293,8 +295,10 @@ func (l *BufferList) queue(b *FileBuffer) {
if b.length == 0 {
panic("BUG: buffer length should never be 0")
}
if b.state != BUF_CLEAN {
l.uncleanCount++
}
if b.state == BUF_DIRTY {
l.dirtyCount++
if l.dirtyParts == nil {
l.dirtyParts = make(map[uint64]*dirtyPart)
}
Expand All @@ -312,8 +316,10 @@ func (l *BufferList) requeueSplit(left *FileBuffer) {
if left.length == 0 {
panic("BUG: buffer length should never be 0")
}
if left.state != BUF_CLEAN {
l.uncleanCount++
}
if left.state == BUF_DIRTY {
l.dirtyCount++
if l.dirtyParts == nil {
l.dirtyParts = make(map[uint64]*dirtyPart)
}
Expand Down Expand Up @@ -350,7 +356,7 @@ func (l *BufferList) SetState(offset, size uint64, ids map[uint64]bool, state Bu
func (l *BufferList) SetFlushedClean() {
ascendChange(&l.at, 0, func(end uint64, b *FileBuffer) (cont bool, chg bool) {
if b.state == BUF_FL_CLEARED {
l.at.Delete(end)
l.delete(b)
return true, true
} else if b.state == BUF_FLUSHED_FULL || b.state == BUF_FLUSHED_CUT {
l.unqueue(b)
Expand Down Expand Up @@ -683,8 +689,8 @@ func (l *BufferList) SplitAt(offset uint64) {
})
}

func (l *BufferList) AnyDirty() (dirty bool) {
return l.dirtyCount > 0
func (l *BufferList) AnyUnclean() bool {
return l.uncleanCount > 0
}

func (l *BufferList) AnyFlushed(offset, size uint64) (flushed bool) {
Expand Down
4 changes: 2 additions & 2 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func (inode *Inode) sendUpload(priority int) bool {

if inode.CacheState == ST_MODIFIED && inode.userMetadataDirty != 0 &&
inode.oldParent == nil && inode.IsFlushing == 0 {
hasDirty := inode.buffers.AnyDirty()
hasDirty := inode.buffers.AnyUnclean()
if !hasDirty {
// Update metadata by COPYing into the same object
// It results in the optimized implementation in S3
Expand Down Expand Up @@ -1321,7 +1321,7 @@ func (inode *Inode) isStillDirty() bool {
if inode.userMetadataDirty != 0 || inode.oldParent != nil || inode.Attributes.Size != inode.knownSize {
return true
}
return inode.buffers.AnyDirty()
return inode.buffers.AnyUnclean()
}

func (inode *Inode) resetCache() {
Expand Down
9 changes: 8 additions & 1 deletion internal/goofys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,14 @@ func (s *GoofysTest) TestMultipartWriteAndTruncate(t *C) {
// Now don't close the FD, but wait until both parts are flushed
for {
fh.inode.mu.Lock()
dirty := fh.inode.buffers.AnyDirty()
dirty := false
fh.inode.buffers.Ascend(0, func(end uint64, b *FileBuffer) (cont bool, changed bool) {
if b.state == BUF_DIRTY {
dirty = true
return false, false
}
return true, false
})
fh.inode.mu.Unlock()
if !dirty {
break
Expand Down

0 comments on commit fc12caf

Please sign in to comment.