Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move retries lower and retry rename ops #82

Merged
merged 4 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 32 additions & 41 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
// RetryDelay is a timeout for a backoff on retrying operations
// that fail due to transient errors like too many file descriptors open.
RetryDelay = time.Millisecond * 200

// RetryAttempts is the maximum number of retries that will be attempted
// before giving up.
RetryAttempts = 6
)

const (
Expand Down Expand Up @@ -364,13 +368,21 @@ func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error {
// Rename and add new file's diskUsage. If the rename fails,
// it will either a) Re-add the size of an existing file, which
// was sustracted before b) Add 0 if there is no existing file.
err = os.Rename(tmpPath, path)
for i := 0; i < RetryAttempts; i++ {
err = os.Rename(tmpPath, path)
// if there's no error, or the source file doesn't exist, abort.
if err == nil || os.IsNotExist(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's worth going down this rabbit hole of understanding what classes of errors are possible here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're putting the temporary directory inside the datastore directory, so that shouldn't be an issue.

Really, we probably don't even need "is not exist", but we might as well have it.

break
}
// Otherwise, this could be a transient error due to some other
// process holding open one of the files. Wait a bit and then
// retry.
time.Sleep(time.Duration(i+1) * RetryDelay)
}
fs.updateDiskUsage(path, true)
willscott marked this conversation as resolved.
Show resolved Hide resolved
return err
}

var putMaxRetries = 6

// Put stores a key/value in the datastore.
//
// Note, that we do not guarantee order of write operations (Put or Delete)
Expand All @@ -392,24 +404,11 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {
return ErrClosed
}

var err error
for i := 1; i <= putMaxRetries; i++ {
_, err = fs.doWriteOp(&op{
typ: opPut,
key: key,
v: value,
})
if err == nil {
break
}

if !strings.Contains(err.Error(), "too many open files") {
break
}

log.Errorf("too many open files, retrying in %dms", 100*i)
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
_, err := fs.doWriteOp(&op{
typ: opPut,
key: key,
v: value,
})
return err
}

Expand Down Expand Up @@ -462,15 +461,7 @@ func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
return false, nil
}

// Do the operation
for i := 0; i < 6; i++ {
err = fs.doOp(oper)

if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}
err = fs.doOp(oper)

// Finish it. If no error, it will signal other operations
// waiting on this result to succeed. Otherwise, they will
Expand Down Expand Up @@ -585,22 +576,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
}
dirsToSync[dir] = struct{}{}

tmp, err := fs.tempFile()
tmp, err := fs.tempFileOnce()

// Fallback retry for temporary error.
if err != nil && isTooManyFDError(err) {
// If we have too many files open, try closing some, then try
// again repeatedly.
if isTooManyFDError(err) {
if err = closer(); err != nil {
return err
}
for i := 0; i < 6; i++ {
time.Sleep(time.Duration(i+1) * RetryDelay)

tmp, err = fs.tempFile()
if err == nil || !isTooManyFDError(err) {
break
}
}
tmp, err = fs.tempFile()
}

if err != nil {
return err
}
Expand Down Expand Up @@ -1122,6 +1108,11 @@ func (fs *Datastore) tempFile() (*os.File, error) {
return file, err
}

func (fs *Datastore) tempFileOnce() (*os.File, error) {
file, err := tempFileOnce(fs.tempPath, "temp-")
return file, err
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

// only call this on directories.
func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error {
dir, err := os.Open(path)
Expand Down
13 changes: 12 additions & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func DirIsEmpty(name string) (bool, error) {

func readFile(filename string) (data []byte, err error) {
// Fallback retry for temporary error.
for i := 0; i < 6; i++ {
for i := 0; i < RetryAttempts; i++ {
data, err = readFileOnce(filename)
if err == nil || !isTooManyFDError(err) {
break
Expand All @@ -32,3 +32,14 @@ func readFile(filename string) (data []byte, err error) {
}
return data, err
}

func tempFile(dir, pattern string) (fi *os.File, err error) {
for i := 0; i < RetryAttempts; i++ {
fi, err = tempFileOnce(dir, pattern)
if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}
return fi, err
}
2 changes: 1 addition & 1 deletion util_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
)

func tempFile(dir, pattern string) (*os.File, error) {
func tempFileOnce(dir, pattern string) (*os.File, error) {
return ioutil.TempFile(dir, pattern)
}

Expand Down
2 changes: 1 addition & 1 deletion util_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func prefixAndSuffix(pattern string) (prefix, suffix string) {
return
}

func tempFile(dir, pattern string) (f *os.File, err error) {
func tempFileOnce(dir, pattern string) (f *os.File, err error) {
if dir == "" {
dir = os.TempDir()
}
Expand Down