Skip to content

Commit

Permalink
Add s3 download by range (#1083)
Browse files Browse the repository at this point in the history
* Add s3 download by range

* Add config add devel

* Full remake

* Small fixes

* Retrigger

* Fix exp backoff for s3reader

* Simplified debug log for s3Reader

* Extract s3reader

* go fmt changes

* Removed debug line

* Reuse last Body

* Fix config, add test for mysql with ranges

* Small rename

* Add comment, fix test, fix error msg

* Rename bucket

* Add devel, fix prefix for mysql test

* Fix docs
  • Loading branch information
proggga committed Sep 16, 2021
1 parent 4bf695e commit fd4fd2c
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 23 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ services:
&& mkdir -p /export/deletetargetbucket
&& mkdir -p /export/walpurgebucket
&& mkdir -p /export/mysqlfullxtrabackupbucket
&& mkdir -p /export/mysqlfullxtrabackupwithrangesbucket
&& mkdir -p /export/mysqlfullmysqldumpbucket
&& mkdir -p /export/mysqlbinlogpushfetchbucket
&& mkdir -p /export/mysqldeleteendtoendbucket
Expand Down
3 changes: 2 additions & 1 deletion docker/gp_tests/scripts/configs/common_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
"AWS_ENDPOINT": "http://s3:9000",
"PGDATABASE": "postgres",
"AWS_S3_FORCE_PATH_STYLE": "true",
"WALG_COMPRESSION_METHOD": "brotli"
"WALG_COMPRESSION_METHOD": "brotli",
"WALG_LOG_LEVEL": "DEVEL"
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/sh
set -e -x

. /usr/local/export_common.sh

export WALG_LOG_LEVEL=DEVEL
export WALE_S3_PREFIX=s3://mysqlfullxtrabackupwithrangesbucket
export WALG_S3_RANGE_BATCH_ENABLED=true


mysqld --initialize --init-file=/etc/mysql/init.sql

service mysql start

sysbench --table-size=10 prepare

sysbench --time=5 run

mysql -e 'FLUSH LOGS'

mysqldump sbtest > /tmp/dump_before_backup

wal-g backup-push

mysql_kill_and_clean_data

wal-g backup-fetch LATEST

chown -R mysql:mysql $MYSQLDATA

service mysql start || (cat /var/log/mysql/error.log && false)

mysql_set_gtid_purged

mysqldump sbtest > /tmp/dump_after_restore

diff /tmp/dump_before_backup /tmp/dump_after_restore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"WALE_S3_PREFIX": "s3://compressionreversebucket",
"WALG_DELTA_MAX_STEPS": "6",
"WALG_USE_WAL_DELTA": "true"
"WALG_USE_WAL_DELTA": "true",
"WALG_LOG_LEVEL": "DEVEL"
10 changes: 9 additions & 1 deletion docs/STORAGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ To configure AWS KMS key for client-side encryption and decryption. By default,

* `WALG_CSE_KMS_REGION`

To configure AWS KMS key region for client-side encryption and decryption (i.e., `eu-west-1`).
To configure AWS KMS key for client-side encryption and decryption. By default, no encryption is used. (AWS_REGION or WALG_CSE_KMS_REGION required to be set when using AWS KMS key client-side encryption)

* `WALG_S3_RANGE_BATCH_ENABLED`

Set to TRUE to allow wal-g in case of network problems to continue downloading from the point that was already downloaded using HTTP Range query. This option is useful when download big files more than few hours.

* `WALG_S3_RANGE_MAX_RETRIES`

If `WALG_S3_RANGE_BATCH_ENABLED` enabled, wal-g will try to reconnect N times, by default 10 times

* `S3_USE_LIST_OBJECTS_V1`

Expand Down
2 changes: 2 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ var (
"S3_ENDPOINT_SOURCE": true,
"S3_ENDPOINT_PORT": true,
"S3_USE_LIST_OBJECTS_V1": true,
"S3_RANGE_BATCH_ENABLED": true,
"S3_RANGE_MAX_RETRIES": true,

// Azure
"WALG_AZ_PREFIX": true,
Expand Down
72 changes: 55 additions & 17 deletions pkg/storages/s3/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -37,6 +38,13 @@ const (
EndpointPortSetting = "S3_ENDPOINT_PORT"
LogLevel = "S3_LOG_LEVEL"
UseListObjectsV1 = "S3_USE_LIST_OBJECTS_V1"
RangeBatchEnabled = "S3_RANGE_BATCH_ENABLED"
RangeQueriesMaxRetries = "S3_RANGE_MAX_RETRIES"

RangeBatchEnabledDefault = false
RangeMaxRetriesDefault = 10
RangeQueryMinRetryDelay = 30 * time.Millisecond
RangeQueryMaxRetryDelay = 300 * time.Second
)

var (
Expand All @@ -61,18 +69,11 @@ var (
s3CertFile,
MaxPartSize,
UseListObjectsV1,
RangeBatchEnabled,
RangeQueriesMaxRetries,
}
)

func getFirstSettingOf(settings map[string]string, keys []string) string {
for _, key := range keys {
if value, ok := settings[key]; ok {
return value
}
}
return ""
}

func NewFolderError(err error, format string, args ...interface{}) storage.Error {
return storage.NewError(err, "S3", format, args...)
}
Expand All @@ -92,18 +93,19 @@ type Folder struct {
useListObjectsV1 bool
}

func NewFolder(uploader Uploader, s3API s3iface.S3API, bucket, path string, useListObjectsV1 bool) *Folder {
func NewFolder(uploader Uploader, s3API s3iface.S3API, settings map[string]string, bucket, path string, useListObjectsV1 bool) *Folder {
return &Folder{
uploader: uploader,
S3API: s3API,
settings: settings,
Bucket: aws.String(bucket),
Path: storage.AddDelimiterToPath(path),
useListObjectsV1: useListObjectsV1,
}
}

func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder, error) {
bucket, path, err := storage.GetPathFromPrefix(prefix)
bucket, storagePath, err := storage.GetPathFromPrefix(prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to configure S3 path")
}
Expand All @@ -124,8 +126,7 @@ func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder,
}
}

folder := NewFolder(*uploader, client, bucket, path, useListObjectsV1)
folder.settings = settings
folder := NewFolder(*uploader, client, settings, bucket, storagePath, useListObjectsV1)

return folder, nil
}
Expand Down Expand Up @@ -183,12 +184,48 @@ func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, erro
}
return nil, errors.Wrapf(err, "failed to read object: '%s' from S3", objectPath)
}
return object.Body, nil

rangeEnabled, maxRetries, minRetryDelay, maxRetryDelay := folder.getReaderSettings()

reader := object.Body
if rangeEnabled {
reader = NewS3Reader(object.Body, objectPath, maxRetries, folder, minRetryDelay, maxRetryDelay)
}
return reader, nil
}

func (folder *Folder) getReaderSettings() (rangeEnabled bool, retriesCount int,
minRetryDelay, maxRetryDelay time.Duration) {
rangeEnabled = RangeBatchEnabledDefault
if rangeBatch, ok := folder.settings[RangeBatchEnabled]; ok {
if strings.TrimSpace(strings.ToLower(rangeBatch)) == "true" {
rangeEnabled = true
} else {
rangeEnabled = false
}
}

retriesCount = RangeMaxRetriesDefault
if maxRetriesRaw, ok := folder.settings[RangeQueriesMaxRetries]; ok {
if maxRetriesInt, err := strconv.Atoi(maxRetriesRaw); err == nil {
retriesCount = maxRetriesInt
}
}

if minRetryDelay == 0 {
minRetryDelay = RangeQueryMinRetryDelay
}
if maxRetryDelay == 0 {
maxRetryDelay = RangeQueryMaxRetryDelay
}

return rangeEnabled, retriesCount, minRetryDelay, maxRetryDelay
}

func (folder *Folder) GetSubFolder(subFolderRelativePath string) storage.Folder {
return NewFolder(folder.uploader, folder.S3API, *folder.Bucket,
subFolder := NewFolder(folder.uploader, folder.S3API, folder.settings, *folder.Bucket,
storage.JoinPath(folder.Path, subFolderRelativePath)+"/", folder.useListObjectsV1)
return subFolder
}

func (folder *Folder) GetPath() string {
Expand All @@ -198,8 +235,9 @@ func (folder *Folder) GetPath() string {
func (folder *Folder) ListFolder() (objects []storage.Object, subFolders []storage.Folder, err error) {
listFunc := func(commonPrefixes []*s3.CommonPrefix, contents []*s3.Object) {
for _, prefix := range commonPrefixes {
subFolders = append(subFolders, NewFolder(folder.uploader, folder.S3API, *folder.Bucket,
*prefix.Prefix, folder.useListObjectsV1))
subFolder := NewFolder(folder.uploader, folder.S3API, folder.settings, *folder.Bucket,
*prefix.Prefix, folder.useListObjectsV1)
subFolders = append(subFolders, subFolder)
}
for _, object := range contents {
// Some storages return root tar_partitions folder as a Key.
Expand Down
165 changes: 165 additions & 0 deletions pkg/storages/s3/s3reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package s3

import (
"fmt"
"hash/fnv"
"io"
"math"
"math/rand"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)

var (
DebugLogBufferCounter = 0
)

type s3Reader struct {
lastBody io.ReadCloser
folder *Folder
maxRetries int
retryNum int
objectPath string
storageCursor int64
maxRetryDelay time.Duration
minRetryDelay time.Duration
reconnectId int
logDebugId string // hash from filename and logDebugId - unique logDebugId used only for debug
}

func (reader *s3Reader) getObjectRange(from, to int64) (*s3.GetObjectOutput, error) {
bytesRange := fmt.Sprintf("bytes=%d-", from)
if to != 0 {
bytesRange += strconv.Itoa(int(to))
}
input := &s3.GetObjectInput{
Bucket: reader.folder.Bucket,
Key: aws.String(reader.objectPath),
Range: aws.String(bytesRange),
}
reader.debugLog("GetObject with range %s", bytesRange)
return reader.folder.S3API.GetObject(input)
}

func (reader *s3Reader) Read(p []byte) (n int, err error) {
reader.debugLog("Read to buffer [%d] bytes", len(p))
reconnect := false
if reader.lastBody == nil { // initial connect, if lastBody wasn't provided
reconnect = true
}
for {
if reconnect {
reconnect = false
connErr := reader.reconnect()
if connErr != nil {
reader.debugLog("reconnect failed %s", connErr)
return 0, connErr
}
}

n, err = reader.lastBody.Read(p)
reader.debugLog("read %d, err %s", n, err)
if err != nil && err != io.EOF {
reconnect = true
continue
}
reader.storageCursor += int64(n)
reader.debugLog("success read")
return n, err
}
}
func (reader *s3Reader) getDebugLogLine(format string, v ...interface{}) string {
prefix := fmt.Sprintf("s3Reader [%s] ", reader.logDebugId)
message := fmt.Sprintf(format, v...)
return prefix + message
}

func (reader *s3Reader) debugLog(format string, v ...interface{}) {
tracelog.DebugLogger.Print(reader.getDebugLogLine(format, v...))
}

func (reader *s3Reader) reconnect() error {
failed := 0

for {
reader.reconnectId++
object, err := reader.getObjectRange(reader.storageCursor, 0)
if err != nil {
failed += 1
reader.debugLog("reconnect failed [%d/%d]: %s", failed, reader.maxRetries, err)
if failed >= reader.maxRetries {
return errors.Wrap(err, reader.getDebugLogLine("Too much reconnecting retries"))
}
sleepTime := reader.getIncrSleep(failed)
reader.debugLog("sleep: %s", sleepTime)
time.Sleep(sleepTime)
continue
}
failed = 0
if reader.lastBody != nil {
err = reader.lastBody.Close()
if err != nil {
msg := reader.getDebugLogLine("We have problems with closing previous connection")
tracelog.DebugLogger.Print(msg)
return errors.Wrap(err, msg)
}
}
reader.lastBody = object.Body
reader.debugLog("reconnect #%d succeeded", reader.reconnectId)
break
}
return nil
}

// THIS COde stolen from s3 lib, from vendor/github.com/aws/aws-sdk-go/aws/client/default_retryer.go
// func (d DefaultRetryer) RetryRules( .. ) time.Duration
// this calculate sleep duration (jitter and exponential backoff)
func (reader *s3Reader) getIncrSleep(retryCount int) time.Duration {
minDelay := reader.minRetryDelay
maxDelay := reader.maxRetryDelay
var delay time.Duration

actualRetryCount := int(math.Log2(float64(minDelay))) + 1
if actualRetryCount < 63-retryCount {
delay = time.Duration(1<<uint64(retryCount)) * getJitterDelay(minDelay)
if delay > maxDelay {
delay = getJitterDelay(maxDelay / 2)
}
} else {
delay = getJitterDelay(maxDelay / 2)
}
return delay
}

func (reader *s3Reader) Close() (err error) {
return reader.lastBody.Close()
}

func NewS3Reader(body io.ReadCloser, objectPath string, retriesCount int, folder *Folder,
minRetryDelay, maxRetryDelay time.Duration) *s3Reader {

DebugLogBufferCounter++
reader := &s3Reader{lastBody: body, objectPath: objectPath, maxRetries: retriesCount, logDebugId: getHash(objectPath, DebugLogBufferCounter),
folder: folder, minRetryDelay: minRetryDelay, maxRetryDelay: maxRetryDelay}

reader.debugLog("Init s3reader path %s", objectPath)
return reader
}

func getHash(objectPath string, id int) string {
hash := fnv.New32a()
_, err := hash.Write([]byte(objectPath))
tracelog.ErrorLogger.FatalfOnError("Fatal, can't write buffer to hash %v", err)

return fmt.Sprintf("%x_%d", hash.Sum32(), id)
}

// getJitterDelay returns a jittered delay for retry
func getJitterDelay(duration time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(duration)) + int64(duration))
}
2 changes: 1 addition & 1 deletion pkg/storages/s3/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ func getEndpointPort(settings map[string]string) string {
return port
}
return DefaultPort
}
}

0 comments on commit fd4fd2c

Please sign in to comment.