Skip to content

Commit

Permalink
Rework the long-suffering asynchronous rename the 3rd time :)
Browse files Browse the repository at this point in the history
Don't use cloud.RenameObject to correctly reflect current state of each file
  • Loading branch information
vitalif committed Sep 16, 2021
1 parent 7e5c42e commit af6d8e9
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 106 deletions.
83 changes: 27 additions & 56 deletions internal/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ func (inode *Inode) isEmptyDir() (bool, error) {
func (inode *Inode) doUnlink() {
parent := inode.Parent

if inode.oldParent != nil {
if inode.oldParent != nil && !inode.renamingTo {
inode.resetCache()
inode.SetCacheState(ST_DELETED)
} else if inode.CacheState != ST_CREATED || inode.IsFlushing > 0 {
Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (parent *Inode) addModified(inc int64) {
// LOCKS_REQUIRED(parent.mu)
// LOCKS_REQUIRED(newParent.mu)
func (parent *Inode) Rename(from string, newParent *Inode, to string) (err error) {
parent.logFuse("Rename", from, newParent.getChildName(to))
fuseLog.Debugf("Rename %v to %v", parent.getChildName(from), newParent.getChildName(to))

fromCloud, fromPath := parent.cloud()
toCloud, toPath := newParent.cloud()
Expand Down Expand Up @@ -1394,40 +1394,40 @@ func renameInCache(fromInode *Inode, newParent *Inode, to string) {
// 6) rename then modify then rename => either rename then modify or modify then rename
// and etc...
parent := fromInode.Parent
if fromInode.IsFlushing > 0 || fromInode.mpu != nil ||
fromInode.CacheState != ST_CREATED && fromInode.oldParent == nil {
if fromInode.CacheState == ST_CREATED && fromInode.IsFlushing == 0 && fromInode.mpu == nil ||
fromInode.oldParent != nil {
// File is either just created or already renamed
// In both cases we can move it without changing oldParent
// ...and, in fact, we CAN'T change oldParent the second time
if fromInode.renamingTo {
// File is already being copied to the new name
// So it may appear in an extra place if we just change the location
if parent.dir.DeletedChildren == nil {
parent.dir.DeletedChildren = make(map[string]*Inode)
}
parent.dir.DeletedChildren[fromInode.Name] = fromInode
fromInode.renamingTo = false
} else {
parent.addModified(-1)
if fromInode.oldParent == newParent && fromInode.oldName == fromInode.Name {
// Moved back. Unrename! :D
fromInode.oldParent = nil
fromInode.oldName = ""
}
}
} else {
// Remember that the original file is "deleted"
// We can skip this step if the file is new and isn't being flushed yet
if parent.dir.DeletedChildren == nil {
parent.dir.DeletedChildren = make(map[string]*Inode)
}
parent.dir.DeletedChildren[fromInode.Name] = fromInode
if fromInode.oldParent != nil {
// File is *probably* in progress of completing a multipart upload
// Rename will be flushed only after the write is flushed
fromInode.oldParent.addModified(-1)
}
if fromInode.CacheState == ST_CACHED {
// Was not modified and we remove it => add modified
// Was not modified and we remove it from current parent => add modified
parent.addModified(1)
}
if fromInode.oldParent == newParent && fromInode.oldName == to {
// Moved back. Unrename! :D
fromInode.oldParent = nil
fromInode.oldName = ""
} else {
fromInode.oldParent = parent
fromInode.oldName = fromInode.Name
}
} else {
// Was just created and we moved it immediately, or was already moved => remove modified
parent.addModified(-1)
if newParent != parent && fromInode.oldParent == newParent && fromInode.oldName == to {
// Moved back. Unrename! :D
fromInode.oldParent.addModified(-1)
fromInode.oldParent = nil
fromInode.oldName = ""
}
fromInode.oldParent = parent
fromInode.oldName = fromInode.Name
}
if newParent.dir.DeletedChildren != nil &&
newParent.dir.DeletedChildren[to] == fromInode {
Expand Down Expand Up @@ -1468,35 +1468,6 @@ func renameInCache(fromInode *Inode, newParent *Inode, to string) {
fromInode.DeRef(1)
}

func RenameObject(cloud StorageBackend, fromFullName string, toFullName string, size *uint64) (err error) {
_, err = cloud.RenameBlob(&RenameBlobInput{
Source: fromFullName,
Destination: toFullName,
})
if err == nil || err != syscall.ENOTSUP {
return
}

_, err = cloud.CopyBlob(&CopyBlobInput{
Source: fromFullName,
Destination: toFullName,
Size: size,
})
if err != nil {
return
}

_, err = cloud.DeleteBlob(&DeleteBlobInput{
Key: fromFullName,
})
if err != nil {
return
}
s3Log.Debugf("Deleted %v", fromFullName)

return
}

// if I had seen a/ and a/b, and now I get a/c, that means a/b is
// done, but not a/
func (parent *Inode) isParentOf(inode *Inode) bool {
Expand Down
127 changes: 82 additions & 45 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,9 @@ func (inode *Inode) SendUpload() bool {
from = appendChildName(from, inode.oldName)
oldParent := inode.oldParent
oldName := inode.oldName
newParent := inode.Parent
newName := inode.Name
inode.renamingTo = true
skipRename := false
if inode.isDir() {
from += "/"
Expand All @@ -1161,59 +1164,92 @@ func (inode *Inode) SendUpload() bool {
go func() {
var err error
if !inode.isDir() || !inode.fs.flags.NoDirObject {
err = RenameObject(cloud, from, key, nil)
mappedErr := mapAwsError(err)
if mappedErr == syscall.ENOENT && skipRename {
// We don't use RenameBlob here even for hypothetical clouds that support it (not S3),
// because if we used it we'd have to do it under the inode lock. Because otherwise
// a parallel read could hit a non-existing name. So, with S3, we do it in 2 passes.
// First we copy the object, change the inode name, and then we delete the old copy.
_, err = cloud.CopyBlob(&CopyBlobInput{
Source: from,
Destination: key,
})
notFoundIgnore := false
if err != nil {
mappedErr := mapAwsError(err)
// Rename the old directory object to copy xattrs from it if it has them
// We're almost never sure if the directory is implicit so we always try
// to rename the directory object
err = nil
}
}

if err == nil && inode.oldParent == oldParent && inode.oldName == oldName {
// Remove from DeletedChildren of the old parent
oldParent.mu.Lock()
delete(oldParent.dir.DeletedChildren, oldName)
oldParent.mu.Unlock()
// And track ModifiedChildren because rename is special - it takes two parents
oldParent.addModified(-1)
}

inode.mu.Lock()
inode.recordFlushError(err)
var unmoveParent *Inode
var unmoveName string
if err != nil {
log.Errorf("Error renaming object from %v to %v: %v", from, key, err)
if inode.oldParent != oldParent || inode.oldName != oldName {
unmoveParent = inode.oldParent
unmoveName = inode.oldName
inode.oldParent = oldParent
inode.oldName = oldName
}
} else {
if inode.oldParent == oldParent && inode.oldName == oldName {
inode.oldParent = nil
inode.oldName = ""
// We're almost never sure if the directory is implicit or not so we
// always try to rename the directory object, but ignore NotFound errors
if mappedErr == syscall.ENOENT && skipRename {
err = nil
notFoundIgnore = true
}
inode.recordFlushError(err)
}
if (inode.CacheState == ST_MODIFIED || inode.CacheState == ST_CREATED) &&
!inode.isStillDirty() {
inode.SetCacheState(ST_CACHED)
if err == nil {
log.Debugf("Copied %v to %v (rename)", from, key)
delKey := from
delParent := oldParent
delName := oldName
inode.mu.Lock()
// Now we know that the object is accessible by the new name
if inode.Parent == newParent && inode.Name == newName {
// Just clear the old path
inode.oldParent = nil
inode.oldName = ""
} else if inode.Parent == oldParent && inode.Name == oldName {
// Someone renamed the inode back to the original name(!)
inode.oldParent = nil
inode.oldName = ""
// Delete the new key instead of the old one (?)
delKey = key
delParent = newParent
delName = newName
} else {
// Someone renamed the inode again(!)
inode.oldParent = newParent
inode.oldName = newName
}
if (inode.CacheState == ST_MODIFIED || inode.CacheState == ST_CREATED) &&
!inode.isStillDirty() {
inode.SetCacheState(ST_CACHED)
}
inode.renamingTo = false
inode.mu.Unlock()
// Now delete the old key
if !notFoundIgnore {
_, err = cloud.DeleteBlob(&DeleteBlobInput{
Key: delKey,
})
}
if err != nil {
log.Debugf("Failed to delete %v during rename, will retry later", delKey)
// Emulate a deleted file
delParent.mu.Lock()
delParent.fs.mu.Lock()
tomb := NewInode(delParent.fs, delParent, delName)
tomb.Id = delParent.fs.allocateInodeId()
tomb.fs.inodes[tomb.Id] = tomb
tomb.userMetadata = make(map[string][]byte)
tomb.CacheState = ST_DELETED
tomb.recordFlushError(err)
delParent.dir.DeletedChildren[delName] = tomb
delParent.fs.mu.Unlock()
delParent.mu.Unlock()
} else {
log.Debugf("Deleted %v - rename completed", from)
// Remove from DeletedChildren of the old parent
delParent.mu.Lock()
delete(delParent.dir.DeletedChildren, delName)
delParent.mu.Unlock()
// And track ModifiedChildren because rename is special - it takes two parents
delParent.addModified(-1)
}
}
}
inode.mu.Lock()
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()

// "Undo" the second move
if unmoveParent != nil {
unmoveParent.mu.Lock()
delete(unmoveParent.dir.DeletedChildren, unmoveName)
unmoveParent.mu.Unlock()
unmoveParent.addModified(-1)
}
}()
return true
}
Expand Down Expand Up @@ -1295,6 +1331,7 @@ func (inode *Inode) SendUpload() bool {
if err != nil {
log.Errorf("Failed to initiate multipart upload for %v: %v", key, err)
} else {
log.Debugf("Started multi-part upload of object %v", key)
inode.mpu = resp
}
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
Expand Down
2 changes: 2 additions & 0 deletions internal/handles.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type Inode struct {
// renamed from: parent, name
oldParent *Inode
oldName string
// is already being renamed to the current name
renamingTo bool

// multipart upload state
mpu *MultipartBlobCommitInput
Expand Down
9 changes: 9 additions & 0 deletions test/xfstests.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# local.config example for xfstests
FSTYP=fuse.geesefs
TEST_DIR=/home/testdir
TEST_DEV=testbucket:
TEST_FS_MOUNT_OPTS="-o allow_other -o--debug_fuse -o--retry-interval=5s -o--log-file=/root/xfstests/geesefs.log"
MOUNT_OPTIONS="-o allow_other -o--debug_fuse -o--retry-interval=5s -o--log-file=/root/xfstests/geesefs.log"
SCRATCH_DEV=testbucket2:
SCRATCH_MNT=/home/testdir2
UMOUNT_PROG=/root/xfstests/sync_unmount
16 changes: 11 additions & 5 deletions test/xfstests.diff
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
diff --git a/common/config b/common/config
index 164381b7..2e7790d6 100644
index 164381b7..88ae95b6 100644
--- a/common/config
+++ b/common/config
@@ -122,6 +122,7 @@ export MOUNT_PROG="$(type -P mount)"
@@ -120,8 +120,11 @@ export MKFS_PROG="$(type -P mkfs)"
export MOUNT_PROG="$(type -P mount)"
[ "$MOUNT_PROG" = "" ] && _fatal "mount not found"

export UMOUNT_PROG="$(type -P umount)"
[ "$UMOUNT_PROG" = "" ] && _fatal "umount not found"
+export UMOUNT_PROG=$(realpath $(dirname $0)/..)"/sync_unmount"
-export UMOUNT_PROG="$(type -P umount)"
-[ "$UMOUNT_PROG" = "" ] && _fatal "umount not found"
+if [ -z "$UMOUNT_PROG" ]; then
+ UMOUNT_PROG="$(type -P umount)"
+ [ "$UMOUNT_PROG" = "" ] && _fatal "umount not found"
+fi
+export UMOUNT_PROG

export FSSTRESS_PROG="./ltp/fsstress"
[ ! -x $FSSTRESS_PROG ] && _fatal "fsstress not found or executable"
Expand Down

0 comments on commit af6d8e9

Please sign in to comment.