Skip to content

Commit

Permalink
Prevent panic possible on parallel conflict & multipart complete
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Mar 29, 2024
1 parent 3cd20b3 commit 9eee5a1
Showing 1 changed file with 43 additions and 36 deletions.
79 changes: 43 additions & 36 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,10 @@ func (inode *Inode) flushPart(part uint64) {
// LOCKS_REQUIRED(inode.mu)
func (inode *Inode) completeMultipart() {
// Server-side copy unmodified parts
if inode.mpu == nil {
// Multipart upload was canceled in the meantime (by a parallel conflict) => do not complete
return
}
finalSize := inode.Attributes.Size
numParts := inode.fs.partNum(finalSize)
numPartOffset, _ := inode.fs.partRange(numParts)
Expand All @@ -1668,47 +1672,50 @@ func (inode *Inode) completeMultipart() {
return
}
inode.recordFlushError(err)
if err == nil && (inode.CacheState == ST_CREATED || inode.CacheState == ST_MODIFIED) {
cloud, key := inode.cloud()
if inode.oldParent != nil {
// Always apply modifications before moving
_, key = inode.oldParent.cloud()
key = appendChildName(key, inode.oldName)
if err != nil || inode.CacheState != ST_CREATED && inode.CacheState != ST_MODIFIED ||
inode.mpu == nil {
// Error, or already flushed, or conflict => do not complete
return
}
cloud, key := inode.cloud()
if inode.oldParent != nil {
// Always apply modifications before moving
_, key = inode.oldParent.cloud()
key = appendChildName(key, inode.oldName)
}
// Finalize the upload
mpu := inode.mpu
mpu.NumParts = uint32(numParts)
inode.mu.Unlock()
inode.fs.addInflightChange(key)
resp, err := cloud.MultipartBlobCommit(mpu)
inode.fs.completeInflightChange(key)
inode.mu.Lock()
if inode.mpu != mpu || inode.CacheState != ST_CREATED && inode.CacheState != ST_MODIFIED {
// Already flushed or conflict => do not complete
return
}
inode.recordFlushError(err)
if err != nil {
log.Warnf("Failed to finalize multi-part upload of object %v: %v", key, err)
if inode.mpu.Metadata != nil {
inode.userMetadataDirty = 2
}
// Finalize the upload
inode.mpu.NumParts = uint32(numParts)
inode.mu.Unlock()
inode.fs.addInflightChange(key)
resp, err := cloud.MultipartBlobCommit(inode.mpu)
inode.fs.completeInflightChange(key)
inode.mu.Lock()
} else {
log.Debugf("Finalized multi-part upload of object %v: etag=%v, size=%v", key, NilStr(resp.ETag), finalSize)
if inode.userMetadataDirty == 1 {
inode.userMetadataDirty = 0
}
inode.mpu = nil
inode.buffers.SetFlushedClean()
inode.updateFromFlush(finalSize, resp.ETag, resp.LastModified, resp.StorageClass)
if inode.CacheState == ST_CREATED || inode.CacheState == ST_MODIFIED {
inode.recordFlushError(err)
if err != nil {
log.Warnf("Failed to finalize multi-part upload of object %v: %v", key, err)
if inode.mpu.Metadata != nil {
inode.userMetadataDirty = 2
}
if !inode.isStillDirty() {
inode.SetCacheState(ST_CACHED)
} else {
log.Debugf("Finalized multi-part upload of object %v: etag=%v, size=%v", key, NilStr(resp.ETag), finalSize)
if inode.userMetadataDirty == 1 {
inode.userMetadataDirty = 0
}
inode.mpu = nil
inode.buffers.SetFlushedClean()
inode.updateFromFlush(finalSize, resp.ETag, resp.LastModified, resp.StorageClass)
if inode.CacheState == ST_CREATED || inode.CacheState == ST_MODIFIED {
if !inode.isStillDirty() {
inode.SetCacheState(ST_CACHED)
} else {
inode.SetCacheState(ST_MODIFIED)
}
}
inode.SetCacheState(ST_MODIFIED)
}
}
} else {
// FIXME: Abort multipart upload, but not just here
// For example, we also should abort it if a partially flushed file is deleted
}
}

Expand Down

0 comments on commit 9eee5a1

Please sign in to comment.