Skip to content

Commit

Permalink
cleanup putMany implementation
Browse files Browse the repository at this point in the history
* Use a map for dirsToSync to avoid syncing the same dir multiple times.
* Keep track of files in a slice, and use offsets into the slice to keep track
of which ones have been closed/removed.

Also, record the fact that we've created a temporary file _before_ we try to
write to it, in case the write fails. That way, we'll try to remove it when we
abort.
  • Loading branch information
Stebalien committed Apr 11, 2020
1 parent c1f339f commit 4d604ee
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,42 +537,42 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
return ErrClosed
}

var dirsToSync []string
type putManyOp struct {
key datastore.Key
path string
file *os.File
}

files := make(map[*os.File]*op, len(data))
ops := make(map[*os.File]int, len(data))
var (
dirsToSync = make(map[string]struct{}, len(data))
files = make([]putManyOp, 0, len(data))
closed int
removed int
)

defer func() {
for fi := range files {
val := ops[fi]
switch val {
case 0:
_ = fi.Close()
fallthrough
case 1:
_ = os.Remove(fi.Name())
}
for closed < len(files) {
files[closed].file.Close()
closed++
}
for removed < len(files) {
_ = os.Remove(files[removed].file.Name())
removed++
}
}()

closer := func() error {
for fi := range files {
if ops[fi] != 0 {
continue
}

for closed < len(files) {
fi := files[closed].file
if fs.sync {
if err := syncFile(fi); err != nil {
return err
}
}

if err := fi.Close(); err != nil {
return err
}

// signify closed
ops[fi] = 1
closed++
}
return nil
}
Expand All @@ -582,7 +582,7 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
if err := fs.makeDirNoSync(dir); err != nil {
return err
}
dirsToSync = append(dirsToSync, dir)
dirsToSync[dir] = struct{}{}

tmp, err := fs.tempFile()

Expand All @@ -604,15 +604,15 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
return err
}

if _, err := tmp.Write(value); err != nil {
return err
}

files[tmp] = &op{
typ: opRename,
// Do this _first_ so we close it if writing fails.
files = append(files, putManyOp{
path: path,
tmp: tmp.Name(),
key: key,
file: tmp,
})

if _, err := tmp.Write(value); err != nil {
return err
}
}

Expand All @@ -624,19 +624,24 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
}

// move files to their proper places
for fi, op := range files {
done, err := fs.doWriteOp(op)
for _, pop := range files {
done, err := fs.doWriteOp(&op{
typ: opRename,
key: pop.key,
tmp: pop.file.Name(),
path: pop.path,
})
if err != nil {
return err
} else if done {
// signify removed
ops[fi] = 2
} else if !done {
_ = os.Remove(pop.file.Name())
}
removed++
}

// now sync the dirs for those files
if fs.sync {
for _, dir := range dirsToSync {
for dir := range dirsToSync {
if err := syncDir(dir); err != nil {
return err
}
Expand Down

0 comments on commit 4d604ee

Please sign in to comment.