Skip to content

Commit

Permalink
fix: remove temporary files when multiple write operations conflict
Browse files Browse the repository at this point in the history
When multiple write operations conflict, we:

1. Try them in-order till one succeeds.
2. After the fact, re-order them such that the pending operations "happen" after
   the one that succeeds.
3. Return "success" for all the pending write operations for that key. This is
   acceptable because we're claiming that the operation that _actually_ succeeded
   happened "last" so it would have clobbered the other operations.

However, in the case of put, we still need to remove the temporary file that we
didn't end up moving into place.
  • Loading branch information
Stebalien committed Apr 10, 2020
1 parent 53d4c9b commit 69cc2bc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
22 changes: 13 additions & 9 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {

var err error
for i := 1; i <= putMaxRetries; i++ {
err = fs.doWriteOp(&op{
_, err = fs.doWriteOp(&op{
typ: opPut,
key: key,
v: value,
Expand Down Expand Up @@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool {
// we assume that the first succeeding operation
// on that key was the last one to happen after
// all successful others.
func (fs *Datastore) doWriteOp(oper *op) error {
//
// done is true if we actually performed the operation, false if we skipped or
// failed.
func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
keyStr := oper.key.String()

opRes := fs.opMap.Begin(keyStr)
if opRes == nil { // nothing to do, a concurrent op succeeded
return nil
return false, nil
}

// Do the operation
var err error
for i := 0; i < 6; i++ {
err = fs.doOp(oper)

Expand All @@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error {
// waiting on this result to succeed. Otherwise, they will
// retry.
opRes.Finish(err == nil)
return err
return err == nil, err
}

func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
Expand Down Expand Up @@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {

// move files to their proper places
for fi, op := range files {
err := fs.doWriteOp(op)
done, err := fs.doWriteOp(op)
if err != nil {
return err
} else if done {
// signify removed
ops[fi] = 2
}
// signify removed
ops[fi] = 2
}

// now sync the dirs for those files
Expand Down Expand Up @@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error {
return ErrClosed
}

return fs.doWriteOp(&op{
_, err := fs.doWriteOp(&op{
typ: opDelete,
key: key,
v: nil,
})
return err
}

// This function always runs within an opLock for the given
Expand Down
41 changes: 41 additions & 0 deletions flatfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ import (
"github.com/ipfs/go-ds-flatfs"
)

func checkTemp(t *testing.T, dir string) {
tempDir, err := os.Open(filepath.Join(dir, ".temp"))
if err != nil {
t.Errorf("failed to open temp dir: %s", err)
return
}

names, err := tempDir.Readdirnames(-1)
tempDir.Close()

if err != nil {
t.Errorf("failed to read temp dir: %s", err)
return
}

for _, name := range names {
t.Errorf("found leftover temporary file: %s", name)
}
}

func tempdir(t testing.TB) (path string, cleanup func()) {
path, err := ioutil.TempDir("", "test-datastore-flatfs-")
if err != nil {
Expand All @@ -48,6 +68,7 @@ type mkShardFunc func(int) *flatfs.ShardIdV1
func testPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -71,6 +92,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) }
func testGet(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -103,6 +125,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) }
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -138,6 +161,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) }
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -162,6 +186,7 @@ type params struct {
func testStorage(p *params, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

target := p.dir + string(os.PathSeparator) + p.key + ".data"
fs, err := flatfs.CreateOrOpen(temp, p.shard, false)
Expand Down Expand Up @@ -256,6 +281,7 @@ func TestStorage(t *testing.T) {
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -277,6 +303,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) }
func testHasFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -303,6 +330,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) }
func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -321,6 +349,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) }
func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -347,6 +376,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -365,6 +395,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound)
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -394,6 +425,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) }
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -434,6 +466,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) }
func testDiskUsage(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -555,6 +588,7 @@ func TestDiskUsageDoubleCount(t *testing.T) {
func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -628,6 +662,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -728,6 +763,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch)
func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -812,6 +848,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs
func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -827,6 +864,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) }
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -842,6 +880,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
func testClose(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -921,6 +960,7 @@ func TestNonDatastoreDir(t *testing.T) {
func TestNoCluster(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, tempdir)

fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false)
if err != nil {
Expand Down Expand Up @@ -1079,6 +1119,7 @@ func TestQueryLeak(t *testing.T) {
func TestSuite(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
if err != nil {
Expand Down

0 comments on commit 69cc2bc

Please sign in to comment.