From 85b566eb038481787b02f0c5b052dbf62e97a16c Mon Sep 17 00:00:00 2001 From: Andrew Pogrebnoi Date: Fri, 3 Feb 2023 12:10:26 +0200 Subject: [PATCH] PBM-875: concurrent download (#776) Each object can be downloaded concurrently in chunks. If a download of a chunk has failed it will be retried a certain amount of time before returning with an error. It starts with the number of workers equal to the concurrency setting. Each worker takes a task with a needed object range (chunk) and downloads it into a part (span) of its memory buffer (arena). Returns an io.ReaderCloser object with the content of the span. And gets a next free span to download the next chunk. The consumer closing io.ReaderCloser marks the respective span as free reuse. An arenas pool is created with the `Download` object and reused for every next downloaded object. Although the object's chunks can be downloaded concurrently, they should be streamed to the consumer sequentially (objects usually are compressed, hence the consumer can't be an oi.Seeker). Therefore if a downloaded span's range is out of order (preceding chunks aren't downloaded yet) it is added to the heap structure (`chunksQueue`) and waits for its queue to be passed to the consumer. The max size the buffer of would be `arenaSize * concurrency`. Where `arenaSize` is `spanSize * spansInArena`. It doesn't mean all of this size would be allocated as some of the span slots may remain unused. Download arena (bytes slice) is split into spans (represented by `dpsan`) whose size should be equal to download chunks. `dspan` implements io.Wrire and io.ReaderCloser interface. Close() marks the span as free to use (download another chunk). Free/busy spans list is managed via a lock-free bitmap index. New config options: ``` restore: ... // NumDownloadWorkers sets the num of goroutine would be requesting chunks // during the download. By default, it's set to GOMAXPROCS. // Default: num of CPU cores numDownloadWorkers int // MaxDownloadBufferMb sets the max size of the in-memory buffer that is used // to download files from the storage. // Default: 0 - `numDownloadWorkers * downloadChunkMb * 16` maxDownloadBufferMb int // Default: 32 (32Mb) downloadChunkMb int ``` Initial benchmarks show up to 7.5x improvements in the restore speed: ``` Instances Backup Size Concurrency Span Mb Restore time PBM-875_concurrent_download_buf branch i3en.xlarge (4vCPU,16Gb RAM) 500Gb 4 32 45min i3en.xlarge (4vCPU,16Gb RAM) 500Gb 6 32 33min i3en.xlarge (4vCPU,16Gb RAM) 500Gb 8 32 32min i3en.xlarge (4vCPU,16Gb RAM) 500Gb 4 64 42min i3en.xlarge (4vCPU,16Gb RAM) 500Gb 4 128 40min main branch (v2.0.3) i3en.xlarge (4vCPU,16Gb RAM) 500Gb - - 240 min ``` --- cli/restore.go | 12 +- e2e-tests/docker/docker-compose-rs.yaml | 32 +- e2e-tests/docker/docker-compose-single.yaml | 1 + e2e-tests/docker/docker-compose.yaml | 2 + pbm/config.go | 13 +- pbm/restore.go | 6 + pbm/restore/physical.go | 85 ++- pbm/rsync.go | 32 +- pbm/storage/s3/download.go | 583 ++++++++++++++++++++ pbm/storage/s3/s3.go | 216 +------- 10 files changed, 746 insertions(+), 236 deletions(-) create mode 100644 pbm/storage/s3/download.go diff --git a/cli/restore.go b/cli/restore.go index d3c4a7c7e..0c855f58f 100644 --- a/cli/restore.go +++ b/cli/restore.go @@ -124,7 +124,8 @@ func runRestore(cn *pbm.PBM, o *restoreOpts, outf outFormat) (fmt.Stringer, erro func waitRestore(cn *pbm.PBM, m *pbm.RestoreMeta) error { ep, _ := cn.GetEpoch() - stg, err := cn.GetStorage(cn.Logger().NewEvent(string(pbm.CmdRestore), m.Backup, m.OPID, ep.TS())) + l := cn.Logger().NewEvent(string(pbm.CmdRestore), m.Backup, m.OPID, ep.TS()) + stg, err := cn.GetStorage(l) if err != nil { return errors.Wrap(err, "get storage") } @@ -137,7 +138,7 @@ func waitRestore(cn *pbm.PBM, m *pbm.RestoreMeta) error { getMeta := cn.GetRestoreMeta if m.Type == pbm.PhysicalBackup || m.Type == pbm.IncrementalBackup { getMeta = func(name string) (*pbm.RestoreMeta, error) { - return pbm.GetPhysRestoreMeta(name, stg) + return pbm.GetPhysRestoreMeta(name, stg, l) } } @@ -237,7 +238,7 @@ func restore(cn *pbm.PBM, bcpName string, nss []string, rsMapping map[string]str } fn = func(name string) (*pbm.RestoreMeta, error) { - return pbm.GetPhysRestoreMeta(name, stg) + return pbm.GetPhysRestoreMeta(name, stg, cn.Logger().NewEvent(string(pbm.CmdRestore), bcpName, "", ep.TS())) } ctx, cancel = context.WithTimeout(context.Background(), waitPhysRestoreStart) } else { @@ -426,12 +427,13 @@ func describeRestore(cn *pbm.PBM, o descrRestoreOpts) (fmt.Stringer, error) { return nil, errors.Wrap(err, "unable to unmarshal config file") } - stg, err := pbm.Storage(cfg, log.New(nil, "cli", "").NewEvent("", "", "", primitive.Timestamp{})) + l := log.New(nil, "cli", "").NewEvent("", "", "", primitive.Timestamp{}) + stg, err := pbm.Storage(cfg, l) if err != nil { return nil, errors.Wrap(err, "get storage") } - meta, err = pbm.GetPhysRestoreMeta(o.restore, stg) + meta, err = pbm.GetPhysRestoreMeta(o.restore, stg, l) if err != nil && meta == nil { return nil, errors.Wrap(err, "get restore meta") } diff --git a/e2e-tests/docker/docker-compose-rs.yaml b/e2e-tests/docker/docker-compose-rs.yaml index d5c2f43b5..cc9d4d73a 100644 --- a/e2e-tests/docker/docker-compose-rs.yaml +++ b/e2e-tests/docker/docker-compose-rs.yaml @@ -15,6 +15,27 @@ services: - /var/run/docker.sock:/var/run/docker.sock - ./conf:/etc/pbm - ./backups:/opt/backups + agent-cli: + container_name: "pbmagent_cli" + user: "1001" + labels: + - "com.percona.pbm.app=agent" + - "com.percona.pbm.agent.rs=cli" + environment: + - "PBM_MONGODB_URI=mongodb://${BACKUP_USER:-bcp}:${MONGO_PASS:-test1234}@rs101:27017" + build: + labels: + - "com.percona.pbm.app=agent" + dockerfile: ./e2e-tests/docker/pbm-agent/Dockerfile + context: ../.. + args: + - MONGODB_VERSION=${MONGODB_VERSION:-4.2} + - MONGODB_IMAGE=${MONGODB_IMAGE:-percona/percona-server-mongodb} + volumes: + - ./conf:/etc/pbm + - ./backups:/opt/backups + cap_add: + - NET_ADMIN rs101: build: @@ -31,6 +52,7 @@ services: - MONGO_USER=dba - BACKUP_USER=bcp - MONGO_PASS=test1234 + - MONGODB_VERSION=${MONGODB_VERSION:-4.2} command: mongod --replSet rs1 --port 27017 --storageEngine wiredTiger --keyFile /opt/keyFile --wiredTigerCacheSizeGB 1 volumes: - data-rs101:/data/db @@ -63,6 +85,8 @@ services: - data-rs103:/data/db agent-rs101: container_name: "pbmagent_rs101" + ports: + - "6061:6060" user: "1001" labels: - "com.percona.pbm.app=agent" @@ -86,6 +110,8 @@ services: - NET_ADMIN agent-rs102: container_name: "pbmagent_rs102" + ports: + - "6062:6060" user: "1001" labels: - "com.percona.pbm.app=agent" @@ -108,6 +134,8 @@ services: - data-rs102:/data/db agent-rs103: container_name: "pbmagent_rs103" + ports: + - "6063:6060" user: "1001" labels: - "com.percona.pbm.app=agent" @@ -132,8 +160,8 @@ services: minio: image: minio/minio:RELEASE.2022-08-08T18-34-09Z hostname: minio - # ports: - # - "9000:9000" + ports: + - "9001:9000" volumes: - backups:/backups environment: diff --git a/e2e-tests/docker/docker-compose-single.yaml b/e2e-tests/docker/docker-compose-single.yaml index 0279ac588..6d9d53e96 100644 --- a/e2e-tests/docker/docker-compose-single.yaml +++ b/e2e-tests/docker/docker-compose-single.yaml @@ -32,6 +32,7 @@ services: - BACKUP_USER=bcp - MONGO_PASS=test1234 - SINGLE_NODE=true + - MONGODB_VERSION=${MONGODB_VERSION:-4.2} command: mongod --replSet rs1 --port 27017 --storageEngine wiredTiger --keyFile /opt/keyFile --wiredTigerCacheSizeGB 1 volumes: - data-rs101:/data/db diff --git a/e2e-tests/docker/docker-compose.yaml b/e2e-tests/docker/docker-compose.yaml index af3c9710b..eda152b44 100644 --- a/e2e-tests/docker/docker-compose.yaml +++ b/e2e-tests/docker/docker-compose.yaml @@ -32,6 +32,7 @@ services: context: ../.. args: - MONGODB_VERSION=${MONGODB_VERSION:-4.2} + - MONGODB_IMAGE=${MONGODB_IMAGE:-percona/percona-server-mongodb} volumes: - ./conf:/etc/pbm - ./backups:/opt/backups @@ -54,6 +55,7 @@ services: - MONGO_USER=dba - BACKUP_USER=bcp - MONGO_PASS=test1234 + - MONGODB_VERSION=${MONGODB_VERSION:-4.2} command: mongod --configsvr --dbpath /data/db --replSet cfg --bind_ip_all --port 27017 --keyFile /opt/keyFile --storageEngine wiredTiger --wiredTigerCacheSizeGB 1 volumes: - ./scripts/start.sh:/opt/start.sh diff --git a/pbm/config.go b/pbm/config.go index 624628132..65f668110 100644 --- a/pbm/config.go +++ b/pbm/config.go @@ -122,8 +122,19 @@ func (s *StorageConf) Path() string { // RestoreConf is config options for the restore type RestoreConf struct { - BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"` // num of documents to buffer + // Logical restore + // + // num of documents to buffer + BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"` NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"` + + // NumDownloadWorkers sets the num of goroutine would be requesting chunks + // during the download. By default, it's set to GOMAXPROCS. + NumDownloadWorkers int `bson:"numDownloadWorkers" json:"numDownloadWorkers,omitempty" yaml:"numDownloadWorkers,omitempty"` + // MaxDownloadBufferMb sets the max size of the in-memory buffer that is used + // to download files from the storage. + MaxDownloadBufferMb int `bson:"maxDownloadBufferMb" json:"maxDownloadBufferMb,omitempty" yaml:"maxDownloadBufferMb,omitempty"` + DownloadChunkMb int `bson:"downloadChunkMb" json:"downloadChunkMb,omitempty" yaml:"downloadChunkMb,omitempty"` } type BackupConf struct { diff --git a/pbm/restore.go b/pbm/restore.go index 9cac9db6a..f1a7eb17c 100644 --- a/pbm/restore.go +++ b/pbm/restore.go @@ -5,6 +5,7 @@ import ( "sort" "time" + "github.com/percona/percona-backup-mongodb/pbm/storage/s3" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -29,6 +30,11 @@ type RestoreMeta struct { Conditions Conditions `bson:"conditions" json:"conditions"` Type BackupType `bson:"type" json:"type"` Leader string `bson:"l,omitempty" json:"l,omitempty"` + Stat *RestoreStat `bson:"stat,omitempty" json:"stat,omitempty"` +} + +type RestoreStat struct { + Download map[string]map[string]s3.DownloadStat `bson:"download,omitempty" json:"download,omitempty"` } type RestoreReplset struct { diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 9a69e6476..f3b9302d6 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -30,6 +30,7 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/compress" "github.com/percona/percona-backup-mongodb/pbm/log" "github.com/percona/percona-backup-mongodb/pbm/storage" + "github.com/percona/percona-backup-mongodb/pbm/storage/s3" ) const ( @@ -64,12 +65,15 @@ type PhysRestore struct { bcp *pbm.BackupMeta files []files + confOpts pbm.RestoreConf + // path to files on a storage the node will sync its - // state with the rest of the cluster - syncPathNode string - syncPathRS string - syncPathCluster string - syncPathPeers map[string]struct{} + // state with the resto of the cluster + syncPathNode string + syncPathNodeStat string + syncPathRS string + syncPathCluster string + syncPathPeers map[string]struct{} // Shards to participate in restore. Num of shards in bcp could // be less than in the cluster and this is ok. // @@ -608,10 +612,14 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, progress |= restoreStared l.Info("copying backup data") - err = r.copyFiles() + dstat, err := r.copyFiles() if err != nil { return errors.Wrap(err, "copy files") } + err = r.writeStat(dstat) + if err != nil { + r.log.Warning("write download stat: %v", err) + } l.Info("preparing data") err = r.prepareData() @@ -651,6 +659,25 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, return nil } +func (r *PhysRestore) writeStat(stat any) error { + d := struct { + D any `json:"d"` + }{ + D: stat, + } + b, err := json.Marshal(d) + if err != nil { + return errors.Wrap(err, "marshal") + } + + err = r.stg.Save(r.syncPathNodeStat, bytes.NewBuffer(b), -1) + if err != nil { + return errors.Wrap(err, "write") + } + + return nil +} + func (r *PhysRestore) dumpMeta(meta *pbm.RestoreMeta, s pbm.Status, msg string) error { name := fmt.Sprintf("%s/%s.json", pbm.PhysRestoresDir, meta.Name) _, err := r.stg.FileStat(name) @@ -667,7 +694,7 @@ func (r *PhysRestore) dumpMeta(meta *pbm.RestoreMeta, s pbm.Status, msg string) // The meta generated here is more for debugging porpuses (just in case). // `pbm status` and `resync` will always rebuild it from agents' reports // not relying solely on this file. - condsm, err := pbm.GetPhysRestoreMeta(meta.Name, r.stg) + condsm, err := pbm.GetPhysRestoreMeta(meta.Name, r.stg, r.log) if err == nil { meta.Replsets = condsm.Replsets meta.Status = condsm.Status @@ -699,7 +726,18 @@ func (r *PhysRestore) dumpMeta(meta *pbm.RestoreMeta, s pbm.Status, msg string) return nil } -func (r *PhysRestore) copyFiles() error { +func (r *PhysRestore) copyFiles() (stat *s3.DownloadStat, err error) { + readFn := r.stg.SourceReader + if t, ok := r.stg.(*s3.S3); ok { + d := t.NewDownload(r.confOpts.NumDownloadWorkers, r.confOpts.MaxDownloadBufferMb, r.confOpts.DownloadChunkMb) + readFn = d.SourceReader + defer func() { + s := d.Stat() + stat = &s + r.log.Debug("download stat: %s", s) + }() + } + cpbuf := make([]byte, 32*1024) for i := len(r.files) - 1; i >= 0; i-- { set := r.files[i] for _, f := range set.Data { @@ -711,46 +749,46 @@ func (r *PhysRestore) copyFiles() error { err := os.MkdirAll(filepath.Dir(dst), os.ModeDir|0o700) if err != nil { - return errors.Wrapf(err, "create path %s", filepath.Dir(dst)) + return stat, errors.Wrapf(err, "create path %s", filepath.Dir(dst)) } r.log.Info("copy <%s> to <%s>", src, dst) - sr, err := r.stg.SourceReader(src) + sr, err := readFn(src) if err != nil { - return errors.Wrapf(err, "create source reader for <%s>", src) + return stat, errors.Wrapf(err, "create source reader for <%s>", src) } defer sr.Close() data, err := compress.Decompress(sr, set.Cmpr) if err != nil { - return errors.Wrapf(err, "decompress object %s", src) + return stat, errors.Wrapf(err, "decompress object %s", src) } defer data.Close() fw, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE, f.Fmode) if err != nil { - return errors.Wrapf(err, "create/open destination file <%s>", dst) + return stat, errors.Wrapf(err, "create/open destination file <%s>", dst) } defer fw.Close() if f.Off != 0 { _, err := fw.Seek(f.Off, io.SeekStart) if err != nil { - return errors.Wrapf(err, "set file offset <%s>|%d", dst, f.Off) + return stat, errors.Wrapf(err, "set file offset <%s>|%d", dst, f.Off) } } - _, err = io.Copy(fw, data) + _, err = io.CopyBuffer(fw, data, cpbuf) if err != nil { - return errors.Wrapf(err, "copy file <%s>", dst) + return stat, errors.Wrapf(err, "copy file <%s>", dst) } if f.Size != 0 { err = fw.Truncate(f.Size) if err != nil { - return errors.Wrapf(err, "truncate file <%s>|%d", dst, f.Size) + return stat, errors.Wrapf(err, "truncate file <%s>|%d", dst, f.Size) } } } } - return nil + return stat, nil } func (r *PhysRestore) prepareData() error { @@ -1035,11 +1073,19 @@ func (r *PhysRestore) startMongo(opts ...string) error { const hbFrameSec = 60 * 2 func (r *PhysRestore) init(name string, opid pbm.OPID, l *log.Event) (err error) { - r.stg, err = r.cn.GetStorage(l) + var cfg pbm.Config + cfg, err = r.cn.GetConfig() + if err != nil { + return errors.Wrap(err, "get pbm config") + } + + r.stg, err = pbm.Storage(cfg, l) if err != nil { return errors.Wrap(err, "get storage") } + r.confOpts = cfg.Restore + r.log = l r.name = name @@ -1048,6 +1094,7 @@ func (r *PhysRestore) init(name string, opid pbm.OPID, l *log.Event) (err error) r.startTS = time.Now().Unix() r.syncPathNode = fmt.Sprintf("%s/%s/rs.%s/node.%s", pbm.PhysRestoresDir, r.name, r.rsConf.ID, r.nodeInfo.Me) + r.syncPathNodeStat = fmt.Sprintf("%s/%s/rs.%s/stat.%s", pbm.PhysRestoresDir, r.name, r.rsConf.ID, r.nodeInfo.Me) r.syncPathRS = fmt.Sprintf("%s/%s/rs.%s/rs", pbm.PhysRestoresDir, r.name, r.rsConf.ID) r.syncPathCluster = fmt.Sprintf("%s/%s/cluster", pbm.PhysRestoresDir, r.name) r.syncPathPeers = make(map[string]struct{}) diff --git a/pbm/rsync.go b/pbm/rsync.go index 64280a1c1..823a70575 100644 --- a/pbm/rsync.go +++ b/pbm/rsync.go @@ -18,6 +18,7 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/archive" "github.com/percona/percona-backup-mongodb/pbm/log" "github.com/percona/percona-backup-mongodb/pbm/storage" + "github.com/percona/percona-backup-mongodb/pbm/storage/s3" "github.com/percona/percona-backup-mongodb/version" ) @@ -48,7 +49,7 @@ func (p *PBM) ResyncStorage(l *log.Event) error { l.Debug("got physical restores list: %v", len(rstrs)) for _, rs := range rstrs { rname := strings.TrimSuffix(rs.Name, ".json") - rmeta, err := GetPhysRestoreMeta(rname, stg) + rmeta, err := GetPhysRestoreMeta(rname, stg, l) if err != nil { l.Error("get meta for restore %s: %v", rs.Name, err) if rmeta == nil { @@ -236,7 +237,7 @@ func (p *PBM) moveCollection(coll, as string) error { return errors.Wrap(err, "remove current data") } -func GetPhysRestoreMeta(restore string, stg storage.Storage) (rmeta *RestoreMeta, err error) { +func GetPhysRestoreMeta(restore string, stg storage.Storage, l *log.Event) (rmeta *RestoreMeta, err error) { mjson := filepath.Join(PhysRestoresDir, restore) + ".json" _, err = stg.FileStat(mjson) if err != nil && err != storage.ErrNotExist { @@ -255,7 +256,7 @@ func GetPhysRestoreMeta(restore string, stg storage.Storage) (rmeta *RestoreMeta } } - condsm, err := ParsePhysRestoreStatus(restore, stg) + condsm, err := ParsePhysRestoreStatus(restore, stg, l) if err != nil { return rmeta, errors.Wrap(err, "parse physical restore status") } @@ -275,6 +276,7 @@ func GetPhysRestoreMeta(restore string, stg storage.Storage) (rmeta *RestoreMeta rmeta.Hb = condsm.Hb rmeta.Conditions = condsm.Conditions rmeta.Type = PhysicalBackup + rmeta.Stat = condsm.Stat return rmeta, err } @@ -282,7 +284,7 @@ func GetPhysRestoreMeta(restore string, stg storage.Storage) (rmeta *RestoreMeta // ParsePhysRestoreStatus parses phys restore's sync files and creates RestoreMeta. // // On files format, see comments for *PhysRestore.toState() in pbm/restore/physical.go -func ParsePhysRestoreStatus(restore string, stg storage.Storage) (*RestoreMeta, error) { +func ParsePhysRestoreStatus(restore string, stg storage.Storage, l *log.Event) (*RestoreMeta, error) { rfiles, err := stg.List(PhysRestoresDir+"/"+restore, "") if err != nil { return nil, errors.Wrap(err, "get files") @@ -359,6 +361,28 @@ func ParsePhysRestoreStatus(restore string, stg storage.Storage) (*RestoreMeta, rs.rs.LastTransitionTS = l.Timestamp rs.rs.Error = l.Error } + case "stat": + src, err := stg.SourceReader(filepath.Join(PhysRestoresDir, restore, f.Name)) + if err != nil { + l.Error("get stat file %s: %v", f.Name, err) + break + } + if meta.Stat == nil { + meta.Stat = &RestoreStat{Download: make(map[string]map[string]s3.DownloadStat)} + } + st := struct { + D s3.DownloadStat `json:"d"` + }{} + err = json.NewDecoder(src).Decode(&st) + if err != nil { + l.Error("unmarshal stat file %s: %v", f.Name, err) + break + } + if _, ok := meta.Stat.Download[rsName]; !ok { + meta.Stat.Download[rsName] = make(map[string]s3.DownloadStat) + } + nName := strings.Join(p[1:], ".") + meta.Stat.Download[rsName][nName] = st.D } rss[rsName] = rs diff --git a/pbm/storage/s3/download.go b/pbm/storage/s3/download.go new file mode 100644 index 000000000..c603f701a --- /dev/null +++ b/pbm/storage/s3/download.go @@ -0,0 +1,583 @@ +package s3 + +import ( + "container/heap" + "crypto/md5" + "encoding/base64" + "fmt" + "io" + "net/http" + "path" + "runtime" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/pkg/errors" + + "github.com/percona/percona-backup-mongodb/pbm/log" +) + +// Downloading objects from the storage. +// +// Each object can be downloaded concurrently in chunks. If a download of a +// chunk has failed it will be retried a certain amount of time before +// returning with an error. +// It starts with the number of workers equal to the concurrency setting. Each +// worker takes a task with a needed object range (chunk) and downloads it into +// a part (span) of its memory buffer (arena). Returns an io.ReaderCloser +// object with the content of the span. And gets a next free span to download +// the next chunk. +// The consumer closing io.ReaderCloser marks the respective span as free reuse. +// An arenas pool is created with the `Download` object and reused for every next +// downloaded object. +// Although the object's chunks can be downloaded concurrently, they should be +// streamed to the consumer sequentially (objects usually are compressed, hence +// the consumer can't be an oi.Seeker). Therefore if a downloaded span's range +// is out of order (preceding chunks aren't downloaded yet) it is added to the +// heap structure (`chunksQueue`) and waits for its queue to be passed to +// the consumer. +// The max size the buffer of would be `arenaSize * concurrency`. Where +// `arenaSize` is `spanSize * spansInArena`. It doesn't mean all of this size +// would be allocated as some of the span slots may remain unused. + +const ( + downloadChuckSizeDefault = 8 << 20 + downloadRetries = 10 + + ccSpanDefault = 32 << 20 + arenaSpans = 8 // an amount of spans in arena +) + +type DownloadStat struct { + Arenas []ArenaStat `bson:"a" json:"a"` + Concurrency int `bson:"cc" json:"cc"` + ArenaSize int `bson:"arSize" json:"arSize"` + SpansNum int `bson:"spanNum" json:"spanNum"` + SpanSize int `bson:"spanSize" json:"spanSize"` + BufSize int `bson:"bufSize" json:"bufSize"` +} + +func (s DownloadStat) String() string { + return fmt.Sprintf("buf %d, arena %d, span %d, spanNum %d, cc %d, %v", + s.BufSize, s.ArenaSize, s.SpanSize, s.SpansNum, s.Concurrency, s.Arenas) +} + +// Download is used to concurrently download objects from the storage. +type Download struct { + s3 *S3 + + arenas []*arena // mem buffer for downloads + spanSize int + cc int // download concurrency + + stat DownloadStat +} + +func (s *S3) NewDownload(cc, bufSizeMb, spanSizeMb int) *Download { + arenaSize, spanSize, cc := downloadOpts(cc, bufSizeMb, spanSizeMb) + s.log.Debug("download max buf %d (arena %d, span %d, concurrency %d)", arenaSize*cc, arenaSize, spanSize, cc) + + arenas := []*arena{} + for i := 0; i < cc; i++ { + arenas = append(arenas, newArena(arenaSize, spanSize)) + } + + return &Download{ + s3: s, + arenas: arenas, + spanSize: spanSize, + cc: cc, + + stat: DownloadStat{ + Concurrency: cc, + ArenaSize: arenaSize, + SpansNum: arenaSize / spanSize, + SpanSize: spanSize, + BufSize: arenaSize * cc, + }, + } +} + +// assume we need more spans in arena above this number of CPUs used +const lowCPU = 8 + +// Adjust download options. We go from spanSize. But if bufMaxMb is +// set, it will be a hard limit on total memory. +func downloadOpts(cc, bufMaxMb, spanSizeMb int) (arenaSize, span, c int) { + if cc == 0 { + cc = runtime.GOMAXPROCS(0) + } + + // broad assumption that increased amount of concurrency may lead to + // extra contention hence need in more spans in arena + spans := arenaSpans + if cc > lowCPU { + spans *= 2 + } + + spanSize := spanSizeMb << 20 + if spanSize == 0 { + spanSize = ccSpanDefault + } + + bufSize := bufMaxMb << 20 + if bufSize == 0 || spanSize*spans*cc <= bufSize { + return spanSize * spans, spanSize, cc + } + + // download buffer can't be smaller than spanSize + if bufSize < spanSize { + spanSize = bufSize + } + + // shrink coucurrency if bufSize too small + if bufSize/cc < spanSize { + cc = bufSize / spanSize + } + + return spanSize * (bufSize / cc / spanSize), spanSize, cc +} + +func (d *Download) SourceReader(name string) (io.ReadCloser, error) { + return d.s3.sourceReader(name, d.arenas, d.cc, d.spanSize) +} + +func (d *Download) Stat() DownloadStat { + d.stat.Arenas = []ArenaStat{} + for _, a := range d.arenas { + d.stat.Arenas = append(d.stat.Arenas, a.stat) + } + + return d.stat +} + +func (s *S3) SourceReader(name string) (io.ReadCloser, error) { + return s.d.SourceReader(name) +} + +type errGetObj error + +// requests an object in chunks and retries if download has failed +type partReader struct { + fname string + fsize int64 // a total size of object (file) to download + written int64 + chunkSize int64 + + getSess func() (*s3.S3, error) + l *log.Event + opts *Conf + buf []byte // preallocated buf for io.Copy + + taskq chan chunkMeta + resultq chan chunk + errc chan error + close chan struct{} +} + +func (s *S3) newPartReader(fname string, fsize int64, chunkSize int) *partReader { + return &partReader{ + l: s.log, + buf: make([]byte, 32*1024), + opts: &s.opts, + fname: fname, + fsize: fsize, + chunkSize: int64(chunkSize), + getSess: func() (*s3.S3, error) { + sess, err := s.s3session() + if err != nil { + return nil, err + } + sess.Client.Config.HTTPClient.Timeout = time.Second * 60 + return sess, nil + }, + } +} + +type chunkMeta struct { + start int64 + end int64 +} + +type chunk struct { + r io.ReadCloser + meta chunkMeta +} + +// a queue (heap) for out-of-order chunks +type chunksQueue []*chunk + +func (b chunksQueue) Len() int { return len(b) } +func (b chunksQueue) Less(i, j int) bool { return b[i].meta.start < b[j].meta.start } +func (b chunksQueue) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b *chunksQueue) Push(x any) { *b = append(*b, x.(*chunk)) } +func (b *chunksQueue) Pop() any { + old := *b + n := len(old) + x := old[n-1] + *b = old[0 : n-1] + return x +} + +func (s *S3) sourceReader(fname string, arenas []*arena, cc, downloadChuckSize int) (io.ReadCloser, error) { + if cc < 1 { + return nil, errors.Errorf("num of workers shuld be at least 1 (got %d)", cc) + } + if len(arenas) < cc { + return nil, errors.Errorf("num of arenas (%d) less then workers (%d)", len(arenas), cc) + } + + fstat, err := s.FileStat(fname) + if err != nil { + return nil, errors.Wrap(err, "get file stat") + } + + r, w := io.Pipe() + + go func() { + pr := s.newPartReader(fname, fstat.Size, downloadChuckSize) + + pr.Run(cc, arenas) + + exitErr := io.EOF + defer func() { + w.CloseWithError(exitErr) + pr.Reset() + }() + + cqueue := &chunksQueue{} + heap.Init(cqueue) + + for { + select { + case rs := <-pr.resultq: + // Although chunks are requested concurrently they must be written sequentially + // to the destination as it is not necessary a file (decompress, mongorestore etc.). + // If it is not its turn (previous chunks weren't written yet) the chunk will be + // added to the buffer to wait. If the buffer grows too much the scheduling of new + // chunks will be paused for buffer to be handled. + if rs.meta.start != pr.written { + heap.Push(cqueue, &rs) + continue + } + + err := pr.writeChunk(&rs, w, downloadRetries) + if err != nil { + exitErr = errors.Wrapf(err, "SourceReader: copy bytes %d-%d from resoponse", rs.meta.start, rs.meta.end) + return + } + + // check if we can send something from the buffer + for len(*cqueue) > 0 && []*chunk(*cqueue)[0].meta.start == pr.written { + r := heap.Pop(cqueue).(*chunk) + err := pr.writeChunk(r, w, downloadRetries) + if err != nil { + exitErr = errors.Wrapf(err, "SourceReader: copy bytes %d-%d from resoponse buffer", r.meta.start, r.meta.end) + return + } + } + + // we've read all bytes in the object + if pr.written >= pr.fsize { + return + } + + case err := <-pr.errc: + exitErr = errors.Wrapf(err, "SourceReader: download '%s/%s'", s.opts.Bucket, fname) + return + } + } + }() + + return r, nil +} + +func (pr *partReader) Run(concurrency int, arenas []*arena) { + pr.taskq = make(chan chunkMeta, concurrency) + pr.resultq = make(chan chunk) + pr.errc = make(chan error) + pr.close = make(chan struct{}) + + // schedule chunks for download + go func() { + for sent := int64(0); sent <= pr.fsize; { + select { + case <-pr.close: + return + case pr.taskq <- chunkMeta{sent, sent + pr.chunkSize - 1}: + sent += pr.chunkSize + } + } + }() + + for i := 0; i < concurrency; i++ { + go pr.worker(arenas[i]) + } +} + +func (pr *partReader) Reset() { + close(pr.close) +} + +func (pr *partReader) writeChunk(r *chunk, to io.Writer, retry int) error { + if r == nil || r.r == nil { + return nil + } + + b, err := io.CopyBuffer(to, r.r, pr.buf) + pr.written += b + r.r.Close() + + return err +} + +func (pr *partReader) worker(buf *arena) { + sess, err := pr.getSess() + if err != nil { + pr.errc <- errors.Wrap(err, "create session") + return + } + + for { + select { + case ch := <-pr.taskq: + r, err := pr.retryChunk(buf, sess, ch.start, ch.end, downloadRetries) + if err != nil { + pr.errc <- err + return + } + + pr.resultq <- chunk{r: r, meta: ch} + + case <-pr.close: + return + } + } +} + +func (pr *partReader) retryChunk(buf *arena, s *s3.S3, start, end int64, retries int) (r io.ReadCloser, err error) { + for i := 0; i < retries; i++ { + r, err = pr.tryChunk(buf, s, start, end) + if err == nil { + return r, nil + } + + pr.l.Warning("retryChunk got %v, try to reconnect in %v", err, time.Second*time.Duration(i)) + time.Sleep(time.Second * time.Duration(i)) + s, err = pr.getSess() + if err != nil { + pr.l.Warning("recreate session err: %v", err) + continue + } + pr.l.Info("session recreated, resuming download") + } + + return nil, err +} + +func (pr *partReader) tryChunk(buf *arena, s *s3.S3, start, end int64) (r io.ReadCloser, err error) { + // just quickly retry w/o new session in case of fail. + // more sophisticated retry on a caller side. + const retry = 2 + for i := 0; i < retry; i++ { + r, err = pr.getChunk(buf, s, start, end) + + if err == nil || err == io.EOF { + return r, nil + } + switch err.(type) { + case errGetObj: + return r, err + } + + pr.l.Warning("failed to download chunk %d-%d", start, end) + } + + return nil, errors.Wrapf(err, "failed to download chunk %d-%d (of %d) after %d retries", start, end, pr.fsize, retry) +} + +func (pr *partReader) getChunk(buf *arena, s *s3.S3, start, end int64) (io.ReadCloser, error) { + getObjOpts := &s3.GetObjectInput{ + Bucket: aws.String(pr.opts.Bucket), + Key: aws.String(path.Join(pr.opts.Prefix, pr.fname)), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", start, end)), + } + + sse := pr.opts.ServerSideEncryption + if sse != nil && sse.SseCustomerAlgorithm != "" { + getObjOpts.SSECustomerAlgorithm = aws.String(sse.SseCustomerAlgorithm) + decodedKey, err := base64.StdEncoding.DecodeString(sse.SseCustomerKey) + getObjOpts.SSECustomerKey = aws.String(string(decodedKey[:])) + if err != nil { + return nil, errors.Wrap(err, "SseCustomerAlgorithm specified with invalid SseCustomerKey") + } + keyMD5 := md5.Sum(decodedKey[:]) + getObjOpts.SSECustomerKeyMD5 = aws.String(base64.StdEncoding.EncodeToString(keyMD5[:])) + } + + s3obj, err := s.GetObject(getObjOpts) + if err != nil { + // if object size is undefined, we would read + // until HTTP code 416 (Requested Range Not Satisfiable) + var er awserr.RequestFailure + if errors.As(err, &er) && er.StatusCode() == http.StatusRequestedRangeNotSatisfiable { + return nil, io.EOF + } + pr.l.Warning("errGetObj Err: %v", err) + return nil, errGetObj(err) + } + defer s3obj.Body.Close() + + if sse != nil { + if sse.SseAlgorithm == s3.ServerSideEncryptionAwsKms { + s3obj.ServerSideEncryption = aws.String(sse.SseAlgorithm) + s3obj.SSEKMSKeyId = aws.String(sse.KmsKeyID) + } else if sse.SseCustomerAlgorithm != "" { + s3obj.SSECustomerAlgorithm = aws.String(sse.SseCustomerAlgorithm) + decodedKey, _ := base64.StdEncoding.DecodeString(sse.SseCustomerKey) + // We don't pass in the key in this case, just the MD5 hash of the key + // for verification + // s3obj.SSECustomerKey = aws.String(string(decodedKey[:])) + keyMD5 := md5.Sum(decodedKey[:]) + s3obj.SSECustomerKeyMD5 = aws.String(base64.StdEncoding.EncodeToString(keyMD5[:])) + } + } + + ch := buf.getSpan() + _, err = io.CopyBuffer(ch, s3obj.Body, buf.cpbuf) + if err != nil { + ch.Close() + return nil, errors.Wrap(err, "copy") + } + return ch, nil +} + +// Download arena (bytes slice) is split into spans (represented by `dpsan`) +// whose size should be equal to download chunks. `dspan` implements io.Wrire +// and io.ReaderCloser interface. Close() marks the span as free to use +// (download another chunk). +// Free/busy spans list is managed via lock-free bitmap index. +type arena struct { + buf []byte + spansize int + spanBitCnt uint64 + freeindex atomic.Uint64 // free slots bitmap + + stat ArenaStat + + cpbuf []byte // preallocated buffer for io.Copy +} + +type ArenaStat struct { + // the max amount of span was occupied simultaneously + MaxSpan int `bson:"MaxSpan" json:"MaxSpan"` + // how many times getSpan() was waiting for the free span + WaitCnt int `bson:"WaitCnt" json:"WaitCnt"` +} + +func newArena(size, spansize int) *arena { + snum := size / spansize + + size = spansize * snum + return &arena{ + buf: make([]byte, size), + spansize: spansize, + spanBitCnt: 1<<(size/spansize) - 1, + cpbuf: make([]byte, 32*1024), + } +} + +func (b *arena) getSpan() *dspan { + var w bool + for { + m := b.freeindex.Load() + if m >= b.spanBitCnt { + // write stat on contention - no free spans now + if !w { + b.stat.WaitCnt++ + w = true + } + + continue + } + i := firstzero(m) + + if i+1 > b.stat.MaxSpan { + b.stat.MaxSpan = i + 1 + } + + if b.freeindex.CompareAndSwap(m, m^uint64(1)<> 1) & m1 + x = (x & m2) + ((x >> 2) & m2) + x = (x + (x >> 4)) & m4 + return int((x * h01) >> 56) +} + +type dspan struct { + rp int // current read pos in the arena + wp int // current write pos in the arena + high int // high bound index of span in the arena + + slot int // slot number in the arena + arena *arena // link to the arena +} + +func (s *dspan) Write(p []byte) (n int, err error) { + n = copy(s.arena.buf[s.wp:s.high], p) + + s.wp += n + return n, nil +} + +func (s *dspan) Read(p []byte) (n int, err error) { + n = copy(p, s.arena.buf[s.rp:s.wp]) + s.rp += n + + if s.rp == s.wp { + return n, io.EOF + } + + return n, nil +} + +func (s *dspan) Close() error { + s.arena.putSpan(s) + return nil +} diff --git a/pbm/storage/s3/s3.go b/pbm/storage/s3/s3.go index d8c0cf3a3..383780314 100644 --- a/pbm/storage/s3/s3.go +++ b/pbm/storage/s3/s3.go @@ -11,7 +11,6 @@ import ( "os" "path" "runtime" - "strconv" "strings" "time" @@ -40,9 +39,6 @@ const ( GCSEndpointURL = "storage.googleapis.com" defaultS3Region = "us-east-1" - - downloadChuckSize = 10 << 20 // 10Mb - downloadRetries = 10 ) type Conf struct { @@ -223,6 +219,8 @@ type S3 struct { opts Conf log *log.Event s3s *s3.S3 + + d *Download // default downloader for small files } func New(opts Conf, l *log.Event) (*S3, error) { @@ -241,6 +239,13 @@ func New(opts Conf, l *log.Event) (*S3, error) { return nil, errors.Wrap(err, "AWS session") } + s.d = &Download{ + s3: s, + arenas: []*arena{newArena(downloadChuckSizeDefault, downloadChuckSizeDefault)}, + spanSize: downloadChuckSizeDefault, + cc: 1, + } + return s, nil } @@ -401,14 +406,13 @@ func (s *S3) List(prefix, suffix string) ([]storage.FileInfo, error) { return true }) if err != nil { - return nil, errors.Wrap(err, "get backup list") + return nil, err } return files, nil } func (s *S3) Copy(src, dst string) error { - copyOpts := &s3.CopyObjectInput{ Bucket: aws.String(s.opts.Bucket), CopySource: aws.String(path.Join(s.opts.Bucket, s.opts.Prefix, src)), @@ -480,204 +484,6 @@ func (s *S3) FileStat(name string) (inf storage.FileInfo, err error) { return inf, nil } -type ( - errGetObj error - errReadObj error -) - -type partReader struct { - fname string - sess *s3.S3 - l *log.Event - opts *Conf - n int64 - tsize int64 - buf []byte -} - -func (s *S3) newPartReader(fname string) *partReader { - return &partReader{ - l: s.log, - buf: make([]byte, downloadChuckSize), - opts: &s.opts, - fname: fname, - tsize: -2, - } -} - -func (pr *partReader) setSession(s *s3.S3) { - s.Client.Config.HTTPClient.Timeout = time.Second * 60 - pr.sess = s -} - -func (pr *partReader) tryNext(w io.Writer) (n int64, err error) { - for i := 0; i < downloadRetries; i++ { - n, err = pr.writeNext(w) - - if err == nil || err == io.EOF { - return n, err - } - - switch err.(type) { - case errGetObj: - return n, err - } - - pr.l.Warning("failed to download chunk %d-%d", pr.n, pr.n+downloadChuckSize-1) - } - - return 0, errors.Wrapf(err, "failed to download chunk %d-%d (of %d) after %d retries", pr.n, pr.n+downloadChuckSize-1, pr.tsize, downloadRetries) -} - -func (pr *partReader) writeNext(w io.Writer) (n int64, err error) { - getObjOpts := &s3.GetObjectInput{ - Bucket: aws.String(pr.opts.Bucket), - Key: aws.String(path.Join(pr.opts.Prefix, pr.fname)), - Range: aws.String(fmt.Sprintf("bytes=%d-%d", pr.n, pr.n+downloadChuckSize-1)), - } - - sse := pr.opts.ServerSideEncryption - if sse != nil && sse.SseCustomerAlgorithm != "" { - getObjOpts.SSECustomerAlgorithm = aws.String(sse.SseCustomerAlgorithm) - decodedKey, err := base64.StdEncoding.DecodeString(sse.SseCustomerKey) - getObjOpts.SSECustomerKey = aws.String(string(decodedKey[:])) - if err != nil { - return 0, errors.Wrap(err, "SseCustomerAlgorithm specified with invalid SseCustomerKey") - } - keyMD5 := md5.Sum(decodedKey[:]) - getObjOpts.SSECustomerKeyMD5 = aws.String(base64.StdEncoding.EncodeToString(keyMD5[:])) - } - - s3obj, err := pr.sess.GetObject(getObjOpts) - if err != nil { - // if object size is undefined, we would read - // until HTTP code 416 (Requested Range Not Satisfiable) - var er awserr.RequestFailure - if errors.As(err, &er) && er.StatusCode() == http.StatusRequestedRangeNotSatisfiable { - return 0, io.EOF - } - pr.l.Warning("errGetObj Err: %v", err) - return 0, errGetObj(err) - } - if pr.tsize == -2 { - pr.setSize(s3obj) - } - - if sse != nil { - if sse.SseAlgorithm == s3.ServerSideEncryptionAwsKms { - s3obj.ServerSideEncryption = aws.String(sse.SseAlgorithm) - s3obj.SSEKMSKeyId = aws.String(sse.KmsKeyID) - } else if sse.SseCustomerAlgorithm != "" { - s3obj.SSECustomerAlgorithm = aws.String(sse.SseCustomerAlgorithm) - decodedKey, _ := base64.StdEncoding.DecodeString(sse.SseCustomerKey) - // We don't pass in the key in this case, just the MD5 hash of the key - // for verification - // s3obj.SSECustomerKey = aws.String(string(decodedKey[:])) - keyMD5 := md5.Sum(decodedKey[:]) - s3obj.SSECustomerKeyMD5 = aws.String(base64.StdEncoding.EncodeToString(keyMD5[:])) - } - } - - n, err = io.CopyBuffer(w, s3obj.Body, pr.buf) - s3obj.Body.Close() - - pr.n += n - - // we don't care about the error if we've read the entire object - if pr.tsize >= 0 && pr.n >= pr.tsize { - return 0, io.EOF - } - - // The last chunk during the PITR restore usually won't be read fully - // (high chances that the targeted time will be in the middle of the chunk) - // so in this case the reader (oplog.Apply) will close the pipe once reaching the - // targeted time. - if err != nil && errors.Is(err, io.ErrClosedPipe) { - return n, nil - } - - if err != nil { - pr.l.Warning("io.copy: %v", err) - return n, errReadObj(err) - } - - return n, nil -} - -func (pr *partReader) setSize(o *s3.GetObjectOutput) { - pr.tsize = -1 - if o.ContentRange == nil { - if o.ContentLength != nil { - pr.tsize = *o.ContentLength - } - return - } - - rng := strings.Split(*o.ContentRange, "/") - if len(rng) < 2 || rng[1] == "*" { - return - } - - size, err := strconv.ParseInt(rng[1], 10, 64) - if err != nil { - pr.l.Warning("unable to parse object size from %s: %v", rng[1], err) - return - } - - pr.tsize = size -} - -// SourceReader reads object with the given name from S3 -// and pipes its data to the returned io.ReadCloser. -// -// It uses partReader to download the object by chunks (`downloadChuckSize`). -// In case of error, it would retry get the next bytes up to `downloadRetries` times. -// If it fails to do so or connection error happened, it recreates the session -// and tries again up to `downloadRetries` times. -func (s *S3) SourceReader(name string) (io.ReadCloser, error) { - pr := s.newPartReader(name) - pr.setSession(s.s3s) - - r, w := io.Pipe() - - go func() { - defer w.Close() - - var err error - Loop: - for { - for i := 0; i < downloadRetries; i++ { - _, err = pr.tryNext(w) - if err == nil { - continue Loop - } - if err == io.EOF { - return - } - if errors.Is(err, io.ErrClosedPipe) { - s.log.Info("reader closed pipe, stopping download") - return - } - - s.log.Warning("got %v, try to reconnect in %v", err, time.Second*time.Duration(i+1)) - time.Sleep(time.Second * time.Duration(i+1)) - s3s, err := s.s3session() - if err != nil { - s.log.Warning("recreate session") - continue - } - pr.setSession(s3s) - s.log.Info("session recreated, resuming download") - } - s.log.Error("download '%s/%s' file from S3: %v", s.opts.Bucket, name, err) - w.CloseWithError(errors.Wrapf(err, "download '%s/%s'", s.opts.Bucket, name)) - return - } - }() - - return r, nil -} - // Delete deletes given file. // It returns storage.ErrNotExist if a file isn't exists func (s *S3) Delete(name string) error { @@ -746,7 +552,7 @@ func (s *S3) session() (*session.Session, error) { Client: ec2metadata.New(awsSession), }) - httpClient := http.DefaultClient + httpClient := &http.Client{} if s.opts.InsecureSkipTLSVerify { httpClient = &http.Client{ Transport: &http.Transport{