Skip to content

Commit

Permalink
Fix exp backoff for s3reader
Browse files Browse the repository at this point in the history
  • Loading branch information
proggga committed Sep 8, 2021
1 parent 36e47a9 commit a21a01c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 29 deletions.
102 changes: 74 additions & 28 deletions pkg/storages/s3/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/wal-g/tracelog"
"hash/fnv"
"io"
"math"
"math/rand"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -45,8 +47,12 @@ const (
RangeQueriesMaxRetries = "S3_RANGE_MAX_RETRIES"


RangeQueriesMaxRetriesDefault = 10
RangeBatchEnabledDefault = false

RangeMaxRetries = 10
RangeQueryMinRetryDelay = 30 * time.Millisecond
RangeQueryMaxRetryDelay = 300 * time.Second

)

var (
Expand Down Expand Up @@ -151,6 +157,8 @@ func (folder *Folder) Exists(objectRelativePath string) (bool, error) {
return true, nil
}



func (folder *Folder) PutObject(name string, content io.Reader) error {
return folder.uploader.upload(*folder.Bucket, folder.Path+name, content)
}
Expand All @@ -176,10 +184,14 @@ func (folder *Folder) CopyObject(srcPath string, dstPath string) error {
type s3Reader struct {
lastBody io.ReadCloser
folder *Folder
maxRetries int32
maxRetries int
retryNum int
objectPath string
storageCursor int64
id string // id useful for logs, cause its shorter than filename
maxRetryDelay time.Duration
minRetryDelay time.Duration
reconnectId int
id string // hash from filename and id - unique id for s3reader instance
}

func (reader *s3Reader) getObjectRange(from, to int64) (*s3.GetObjectOutput, error) {
Expand Down Expand Up @@ -226,56 +238,77 @@ func (reader *s3Reader) Read(p []byte) (n int, err error) {
}

func (reader *s3Reader) reconnect() error {
failed := int32(0)
backoffSleepDuration := 0.25 * 1000 * time.Millisecond
sleepMultiplier := 1
failed := 0

for {
reader.reconnectId++
object, err := reader.getObjectRange(reader.storageCursor, 0)
if err != nil {
failed += 1
tracelog.DebugLogger.Printf("s3Reader [%s] reconnect failed: %s", reader.id, err)
tracelog.DebugLogger.Printf("s3Reader [%s] reconnect failed [%d/%d]: %s",
reader.id, failed, reader.maxRetries, err)
if failed >= reader.maxRetries {
return errors.Wrap(err, "s3Reader Too much reconnecting retries")
return errors.Wrap(err, fmt.Sprintf("s3Reader [%d] Too much reconnecting retries", reader.id))
}
time.Sleep(time.Duration(sleepMultiplier) * backoffSleepDuration)
sleepMultiplier *= 2
sleepTime := reader.getIncrSleep(failed)
tracelog.DebugLogger.Printf("s3Reader [%s] sleep: %s", sleepTime)
time.Sleep(sleepTime)
continue
}
failed = 0
if reader.lastBody != nil {
err = reader.lastBody.Close()
if err != nil {
return errors.Wrap(err, "s3Reader We have problems with closing prev connection, smth strange")
msg := fmt.Sprintf("s3Reader [%s] We have problems with closing previous connection", reader.id)
tracelog.DebugLogger.Print(msg)
return errors.Wrap(err, msg)
}
}
reader.lastBody = object.Body
tracelog.DebugLogger.Printf("s3Reader [%s] reconnect succeeded", reader.id)
tracelog.DebugLogger.Printf("s3Reader [%s] reconnect #%d succeeded", reader.id, reader.reconnectId)
break
}
return nil
}

func (reader *s3Reader) getIncrSleep(retryCount int) time.Duration {
minDelay := reader.minRetryDelay
maxDelay := reader.maxRetryDelay
var delay time.Duration

actualRetryCount := int(math.Log2(float64(minDelay))) + 1
if actualRetryCount < 63-retryCount {
delay = time.Duration(1<<uint64(retryCount)) * getJitterDelay(minDelay)
if delay > maxDelay {
delay = getJitterDelay(maxDelay / 2)
}
} else {
delay = getJitterDelay(maxDelay / 2)
}
return delay
}

func (reader *s3Reader) Close() (err error) {
return reader.lastBody.Close()
}

func NewS3Reader(objectPath string, maxRetries int32, folder *Folder) *s3Reader {
func NewS3Reader(objectPath string, retriesCount int, folder *Folder,
minRetryDelay, maxRetryDelay time.Duration) *s3Reader {

S3BufferCounter++
reader := &s3Reader{objectPath: objectPath, maxRetries: maxRetries,
folder: folder, id: getId(objectPath, S3BufferCounter)}
reader := &s3Reader{objectPath: objectPath, maxRetries: retriesCount, id: getHash(objectPath, S3BufferCounter),
folder: folder, minRetryDelay: minRetryDelay, maxRetryDelay: maxRetryDelay}

tracelog.DebugLogger.Printf("Init s3reader id %s path %s", reader.id, objectPath)
tracelog.DebugLogger.Printf("Init s3reader hash %s path %s", reader.id, objectPath)
return reader
}

func getId(objectPath string, counter int) string {
func getHash(objectPath string, id int) string {
hash := fnv.New32a()
_, err := hash.Write([]byte(objectPath))
if err != nil {
tracelog.ErrorLogger.Println("Fatal, can't write buffer to hash")
return "<INVALID HASH>"
}
return fmt.Sprintf("%x_%d", hash.Sum32(), counter)
tracelog.ErrorLogger.FatalfOnError("Fatal, can't write buffer to hash", err)

return fmt.Sprintf("%x_%d", hash.Sum32(), id)
}

func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, error) {
Expand All @@ -293,17 +326,23 @@ func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, erro
return nil, errors.Wrapf(err, "failed to read object: '%s' from S3", objectPath)
}

rangeEnabled, maxRetries := folder.getReaderSettings()
rangeEnabled, maxRetries, minRetryDelay, maxRetryDelay := folder.getReaderSettings()

reader := object.Body
if rangeEnabled {
_ = object.Body.Close() // we don't need it anymore
reader = NewS3Reader(objectPath, int32(maxRetries), folder)
reader = NewS3Reader(objectPath, maxRetries, folder, minRetryDelay, maxRetryDelay)
}
return reader, nil
}

func (folder *Folder) getReaderSettings() (rangeEnabled bool, maxRetries int) {
// getJitterDelay returns a jittered delay for retry
func getJitterDelay(duration time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(duration)) + int64(duration))
}

func (folder *Folder) getReaderSettings() (rangeEnabled bool, retriesCount int,
minRetryDelay, maxRetryDelay time.Duration) {
rangeEnabled = RangeBatchEnabledDefault
if rangeBatch, ok := folder.settings[RangeBatchEnabled]; ok {
if rangeBatch == "true" {
Expand All @@ -313,14 +352,21 @@ func (folder *Folder) getReaderSettings() (rangeEnabled bool, maxRetries int) {
}
}

maxRetries = RangeQueriesMaxRetriesDefault
retriesCount = RangeMaxRetries
if maxRetriesRaw, ok := folder.settings[RangeQueriesMaxRetries]; ok {
if maxRetriesInt, err := strconv.Atoi(maxRetriesRaw); err == nil {
maxRetries = maxRetriesInt
retriesCount = maxRetriesInt
}
}

return rangeEnabled, maxRetries
if minRetryDelay == 0 {
minRetryDelay = RangeQueryMinRetryDelay
}
if maxRetryDelay == 0 {
maxRetryDelay = RangeQueryMaxRetryDelay
}

return rangeEnabled, retriesCount, minRetryDelay, maxRetryDelay
}

func (folder *Folder) GetSubFolder(subFolderRelativePath string) storage.Folder {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storages/s3/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ func getDefaultConfig(settings map[string]string) *aws.Config {

if logLevel, ok := settings[LogLevel]; ok {
config = config.WithLogLevel(func(s string) aws.LogLevelType {
return aws.LogDebug
switch s {
case "DEVEL":
return aws.LogDebug
default:
return aws.LogOff
}
}(logLevel))
}

Expand Down

0 comments on commit a21a01c

Please sign in to comment.