Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 download by range #1083

Merged
merged 17 commits into from
Sep 16, 2021
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ services:
&& mkdir -p /export/deletetargetbucket
&& mkdir -p /export/walpurgebucket
&& mkdir -p /export/mysqlfullxtrabackupbucket
&& mkdir -p /export/mysqlfullxtrabackupwithrangesbucket
&& mkdir -p /export/mysqlfullmysqldumpbucket
&& mkdir -p /export/mysqlbinlogpushfetchbucket
&& mkdir -p /export/mysqldeleteendtoendbucket
Expand Down
3 changes: 2 additions & 1 deletion docker/gp_tests/scripts/configs/common_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
"AWS_ENDPOINT": "http://s3:9000",
"PGDATABASE": "postgres",
"AWS_S3_FORCE_PATH_STYLE": "true",
"WALG_COMPRESSION_METHOD": "brotli"
"WALG_COMPRESSION_METHOD": "brotli",
"WALG_LOG_LEVEL": "DEVEL"
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/sh
set -e -x

. /usr/local/export_common.sh

export WALG_LOG_LEVEL=DEVEL
export WALE_S3_PREFIX=s3://mysqlfullxtrabackupwithrangesbucket
export WALG_S3_RANGE_BATCH_ENABLED=true


mysqld --initialize --init-file=/etc/mysql/init.sql

service mysql start

sysbench --table-size=10 prepare

sysbench --time=5 run

mysql -e 'FLUSH LOGS'

mysqldump sbtest > /tmp/dump_before_backup

wal-g backup-push

mysql_kill_and_clean_data

wal-g backup-fetch LATEST

chown -R mysql:mysql $MYSQLDATA

service mysql start || (cat /var/log/mysql/error.log && false)

mysql_set_gtid_purged

mysqldump sbtest > /tmp/dump_after_restore

diff /tmp/dump_before_backup /tmp/dump_after_restore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"WALE_S3_PREFIX": "s3://compressionreversebucket",
"WALG_DELTA_MAX_STEPS": "6",
"WALG_USE_WAL_DELTA": "true"
"WALG_USE_WAL_DELTA": "true",
"WALG_LOG_LEVEL": "DEVEL"
10 changes: 9 additions & 1 deletion docs/STORAGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ To configure AWS KMS key for client-side encryption and decryption. By default,

* `WALG_CSE_KMS_REGION`

To configure AWS KMS key region for client-side encryption and decryption (i.e., `eu-west-1`).
To configure AWS KMS key for client-side encryption and decryption. By default, no encryption is used. (AWS_REGION or WALG_CSE_KMS_REGION required to be set when using AWS KMS key client-side encryption)

* `WALG_S3_RANGE_BATCH_ENABLED`

Set to TRUE to allow wal-g in case of network problems to continue downloading from the point that was already downloaded using HTTP Range query. This option is useful when download big files more than few hours.

* `WALG_S3_RANGE_MAX_RETRIES`

If `WALG_S3_RANGE_BATCH_ENABLED` enabled, wal-g will try to reconnect N times, by default 10 times

* `S3_USE_LIST_OBJECTS_V1`

Expand Down
2 changes: 2 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ var (
"S3_ENDPOINT_SOURCE": true,
"S3_ENDPOINT_PORT": true,
"S3_USE_LIST_OBJECTS_V1": true,
"S3_RANGE_BATCH_ENABLED": true,
"S3_RANGE_MAX_RETRIES": true,

// Azure
"WALG_AZ_PREFIX": true,
Expand Down
72 changes: 55 additions & 17 deletions pkg/storages/s3/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -37,6 +38,13 @@ const (
EndpointPortSetting = "S3_ENDPOINT_PORT"
LogLevel = "S3_LOG_LEVEL"
UseListObjectsV1 = "S3_USE_LIST_OBJECTS_V1"
RangeBatchEnabled = "S3_RANGE_BATCH_ENABLED"
RangeQueriesMaxRetries = "S3_RANGE_MAX_RETRIES"

proggga marked this conversation as resolved.
Show resolved Hide resolved
RangeBatchEnabledDefault = false
RangeMaxRetriesDefault = 10
RangeQueryMinRetryDelay = 30 * time.Millisecond
RangeQueryMaxRetryDelay = 300 * time.Second
)

var (
Expand All @@ -61,18 +69,11 @@ var (
s3CertFile,
MaxPartSize,
UseListObjectsV1,
RangeBatchEnabled,
RangeQueriesMaxRetries,
}
)

func getFirstSettingOf(settings map[string]string, keys []string) string {
for _, key := range keys {
if value, ok := settings[key]; ok {
return value
}
}
return ""
}

func NewFolderError(err error, format string, args ...interface{}) storage.Error {
return storage.NewError(err, "S3", format, args...)
}
Expand All @@ -92,18 +93,19 @@ type Folder struct {
useListObjectsV1 bool
}

func NewFolder(uploader Uploader, s3API s3iface.S3API, bucket, path string, useListObjectsV1 bool) *Folder {
func NewFolder(uploader Uploader, s3API s3iface.S3API, settings map[string]string, bucket, path string, useListObjectsV1 bool) *Folder {
return &Folder{
uploader: uploader,
S3API: s3API,
settings: settings,
Bucket: aws.String(bucket),
Path: storage.AddDelimiterToPath(path),
useListObjectsV1: useListObjectsV1,
}
}

func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder, error) {
bucket, path, err := storage.GetPathFromPrefix(prefix)
bucket, storagePath, err := storage.GetPathFromPrefix(prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to configure S3 path")
}
Expand All @@ -124,8 +126,7 @@ func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder,
}
}

folder := NewFolder(*uploader, client, bucket, path, useListObjectsV1)
folder.settings = settings
folder := NewFolder(*uploader, client, settings, bucket, storagePath, useListObjectsV1)

return folder, nil
}
Expand Down Expand Up @@ -183,12 +184,48 @@ func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, erro
}
return nil, errors.Wrapf(err, "failed to read object: '%s' from S3", objectPath)
}
return object.Body, nil

rangeEnabled, maxRetries, minRetryDelay, maxRetryDelay := folder.getReaderSettings()

reader := object.Body
if rangeEnabled {
reader = NewS3Reader(object.Body, objectPath, maxRetries, folder, minRetryDelay, maxRetryDelay)
}
return reader, nil
}

func (folder *Folder) getReaderSettings() (rangeEnabled bool, retriesCount int,
minRetryDelay, maxRetryDelay time.Duration) {
rangeEnabled = RangeBatchEnabledDefault
if rangeBatch, ok := folder.settings[RangeBatchEnabled]; ok {
if strings.TrimSpace(strings.ToLower(rangeBatch)) == "true" {
rangeEnabled = true
} else {
rangeEnabled = false
}
}

retriesCount = RangeMaxRetriesDefault
if maxRetriesRaw, ok := folder.settings[RangeQueriesMaxRetries]; ok {
if maxRetriesInt, err := strconv.Atoi(maxRetriesRaw); err == nil {
retriesCount = maxRetriesInt
}
}

if minRetryDelay == 0 {
minRetryDelay = RangeQueryMinRetryDelay
}
if maxRetryDelay == 0 {
maxRetryDelay = RangeQueryMaxRetryDelay
}

return rangeEnabled, retriesCount, minRetryDelay, maxRetryDelay
}

func (folder *Folder) GetSubFolder(subFolderRelativePath string) storage.Folder {
return NewFolder(folder.uploader, folder.S3API, *folder.Bucket,
subFolder := NewFolder(folder.uploader, folder.S3API, folder.settings, *folder.Bucket,
storage.JoinPath(folder.Path, subFolderRelativePath)+"/", folder.useListObjectsV1)
return subFolder
}

func (folder *Folder) GetPath() string {
Expand All @@ -198,8 +235,9 @@ func (folder *Folder) GetPath() string {
func (folder *Folder) ListFolder() (objects []storage.Object, subFolders []storage.Folder, err error) {
listFunc := func(commonPrefixes []*s3.CommonPrefix, contents []*s3.Object) {
for _, prefix := range commonPrefixes {
subFolders = append(subFolders, NewFolder(folder.uploader, folder.S3API, *folder.Bucket,
*prefix.Prefix, folder.useListObjectsV1))
subFolder := NewFolder(folder.uploader, folder.S3API, folder.settings, *folder.Bucket,
*prefix.Prefix, folder.useListObjectsV1)
subFolders = append(subFolders, subFolder)
}
for _, object := range contents {
// Some storages return root tar_partitions folder as a Key.
Expand Down
165 changes: 165 additions & 0 deletions pkg/storages/s3/s3reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package s3

import (
"fmt"
"hash/fnv"
"io"
"math"
"math/rand"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)

var (
DebugLogBufferCounter = 0
)

type s3Reader struct {
lastBody io.ReadCloser
folder *Folder
maxRetries int
retryNum int
objectPath string
storageCursor int64
maxRetryDelay time.Duration
minRetryDelay time.Duration
reconnectId int
logDebugId string // hash from filename and logDebugId - unique logDebugId used only for debug
}

func (reader *s3Reader) getObjectRange(from, to int64) (*s3.GetObjectOutput, error) {
bytesRange := fmt.Sprintf("bytes=%d-", from)
if to != 0 {
bytesRange += strconv.Itoa(int(to))
}
input := &s3.GetObjectInput{
Bucket: reader.folder.Bucket,
Key: aws.String(reader.objectPath),
Range: aws.String(bytesRange),
}
reader.debugLog("GetObject with range %s", bytesRange)
return reader.folder.S3API.GetObject(input)
}

func (reader *s3Reader) Read(p []byte) (n int, err error) {
reader.debugLog("Read to buffer [%d] bytes", len(p))
reconnect := false
if reader.lastBody == nil { // initial connect, if lastBody wasn't provided
reconnect = true
}
for {
if reconnect {
reconnect = false
connErr := reader.reconnect()
if connErr != nil {
reader.debugLog("reconnect failed %s", connErr)
return 0, connErr
}
}

n, err = reader.lastBody.Read(p)
reader.debugLog("read %d, err %s", n, err)
if err != nil && err != io.EOF {
reconnect = true
continue
}
reader.storageCursor += int64(n)
reader.debugLog("success read")
return n, err
}
}
func (reader *s3Reader) getDebugLogLine(format string, v ...interface{}) string {
prefix := fmt.Sprintf("s3Reader [%s] ", reader.logDebugId)
message := fmt.Sprintf(format, v...)
return prefix + message
}

func (reader *s3Reader) debugLog(format string, v ...interface{}) {
tracelog.DebugLogger.Print(reader.getDebugLogLine(format, v...))
}

func (reader *s3Reader) reconnect() error {
failed := 0

for {
reader.reconnectId++
object, err := reader.getObjectRange(reader.storageCursor, 0)
if err != nil {
failed += 1
reader.debugLog("reconnect failed [%d/%d]: %s", failed, reader.maxRetries, err)
if failed >= reader.maxRetries {
return errors.Wrap(err, reader.getDebugLogLine("Too much reconnecting retries"))
}
sleepTime := reader.getIncrSleep(failed)
reader.debugLog("sleep: %s", sleepTime)
time.Sleep(sleepTime)
continue
}
failed = 0
if reader.lastBody != nil {
err = reader.lastBody.Close()
if err != nil {
msg := reader.getDebugLogLine("We have problems with closing previous connection")
tracelog.DebugLogger.Print(msg)
return errors.Wrap(err, msg)
}
}
reader.lastBody = object.Body
reader.debugLog("reconnect #%d succeeded", reader.reconnectId)
break
}
return nil
}

// THIS COde stolen from s3 lib, from vendor/github.com/aws/aws-sdk-go/aws/client/default_retryer.go
// func (d DefaultRetryer) RetryRules( .. ) time.Duration
// this calculate sleep duration (jitter and exponential backoff)
func (reader *s3Reader) getIncrSleep(retryCount int) time.Duration {
proggga marked this conversation as resolved.
Show resolved Hide resolved
minDelay := reader.minRetryDelay
maxDelay := reader.maxRetryDelay
var delay time.Duration

actualRetryCount := int(math.Log2(float64(minDelay))) + 1
if actualRetryCount < 63-retryCount {
delay = time.Duration(1<<uint64(retryCount)) * getJitterDelay(minDelay)
if delay > maxDelay {
delay = getJitterDelay(maxDelay / 2)
}
} else {
delay = getJitterDelay(maxDelay / 2)
}
return delay
}

func (reader *s3Reader) Close() (err error) {
return reader.lastBody.Close()
}

func NewS3Reader(body io.ReadCloser, objectPath string, retriesCount int, folder *Folder,
minRetryDelay, maxRetryDelay time.Duration) *s3Reader {

DebugLogBufferCounter++
reader := &s3Reader{lastBody: body, objectPath: objectPath, maxRetries: retriesCount, logDebugId: getHash(objectPath, DebugLogBufferCounter),
folder: folder, minRetryDelay: minRetryDelay, maxRetryDelay: maxRetryDelay}

reader.debugLog("Init s3reader path %s", objectPath)
return reader
}

func getHash(objectPath string, id int) string {
hash := fnv.New32a()
_, err := hash.Write([]byte(objectPath))
tracelog.ErrorLogger.FatalfOnError("Fatal, can't write buffer to hash %v", err)

return fmt.Sprintf("%x_%d", hash.Sum32(), id)
}

// getJitterDelay returns a jittered delay for retry
func getJitterDelay(duration time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(duration)) + int64(duration))
}
2 changes: 1 addition & 1 deletion pkg/storages/s3/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ func getEndpointPort(settings map[string]string) string {
return port
}
return DefaultPort
}
}