Skip to content

Commit

Permalink
PBM-875: concurrent download (#776)
Browse files Browse the repository at this point in the history
Each object can be downloaded concurrently in chunks. If a download of a
chunk has failed it will be retried a certain amount of time before
returning with an error.
It starts with the number of workers equal to the concurrency setting. Each
worker takes a task with a needed object range (chunk) and downloads it into
a part (span) of its memory buffer (arena). Returns an io.ReaderCloser
object with the content of the span. And gets a next free span to download
the next chunk.
The consumer closing io.ReaderCloser marks the respective span as free reuse.
An arenas pool is created with the `Download` object and reused for every next
downloaded object.
Although the object's chunks can be downloaded concurrently, they should be
streamed to the consumer sequentially (objects usually are compressed, hence
the consumer can't be an oi.Seeker). Therefore if a downloaded span's range
is out of order (preceding chunks aren't downloaded yet) it is added to the
heap structure (`chunksQueue`) and waits for its queue to be passed to
the consumer.
The max size the buffer of would be `arenaSize * concurrency`. Where
`arenaSize` is `spanSize * spansInArena`. It doesn't mean all of this size
would be allocated as some of the span slots may remain unused.

Download arena (bytes slice) is split into spans (represented by `dpsan`)
whose size should be equal to download chunks. `dspan` implements io.Wrire
and io.ReaderCloser interface. Close() marks the span as free to use
(download another chunk).
Free/busy spans list is managed via a lock-free bitmap index.

New config options:
```
restore:
 ...
	// NumDownloadWorkers sets the num of goroutine would be requesting chunks
	// during the download. By default, it's set to GOMAXPROCS.
	// Default: num of CPU cores
	numDownloadWorkers int
	// MaxDownloadBufferMb sets the max size of the in-memory buffer that is used
	// to download files from the storage.
	// Default: 0 - `numDownloadWorkers * downloadChunkMb * 16`
	maxDownloadBufferMb int
	// Default: 32 (32Mb)
	downloadChunkMb int 
```

Initial benchmarks show up to 7.5x improvements in the restore speed:
```
Instances	                 Backup Size	Concurrency	  Span Mb 	Restore time
PBM-875_concurrent_download_buf branch						
i3en.xlarge (4vCPU,16Gb RAM)	500Gb	        4	        32		    45min	
i3en.xlarge (4vCPU,16Gb RAM)	500Gb	        6	        32		    33min	
i3en.xlarge (4vCPU,16Gb RAM)	500Gb	        8	        32		    32min	
i3en.xlarge (4vCPU,16Gb RAM)	500Gb	        4	        64		    42min	
i3en.xlarge (4vCPU,16Gb RAM)	500Gb	        4	        128		    40min	
main branch	(v2.0.3)					    
i3en.xlarge (4vCPU,16Gb RAM)	500Gb	        -	         -		    240 min	
```
  • Loading branch information
dAdAbird committed Feb 3, 2023
1 parent fb77366 commit 85b566e
Show file tree
Hide file tree
Showing 10 changed files with 746 additions and 236 deletions.
12 changes: 7 additions & 5 deletions cli/restore.go
Expand Up @@ -124,7 +124,8 @@ func runRestore(cn *pbm.PBM, o *restoreOpts, outf outFormat) (fmt.Stringer, erro

func waitRestore(cn *pbm.PBM, m *pbm.RestoreMeta) error {
ep, _ := cn.GetEpoch()
stg, err := cn.GetStorage(cn.Logger().NewEvent(string(pbm.CmdRestore), m.Backup, m.OPID, ep.TS()))
l := cn.Logger().NewEvent(string(pbm.CmdRestore), m.Backup, m.OPID, ep.TS())
stg, err := cn.GetStorage(l)
if err != nil {
return errors.Wrap(err, "get storage")
}
Expand All @@ -137,7 +138,7 @@ func waitRestore(cn *pbm.PBM, m *pbm.RestoreMeta) error {
getMeta := cn.GetRestoreMeta
if m.Type == pbm.PhysicalBackup || m.Type == pbm.IncrementalBackup {
getMeta = func(name string) (*pbm.RestoreMeta, error) {
return pbm.GetPhysRestoreMeta(name, stg)
return pbm.GetPhysRestoreMeta(name, stg, l)
}
}

Expand Down Expand Up @@ -237,7 +238,7 @@ func restore(cn *pbm.PBM, bcpName string, nss []string, rsMapping map[string]str
}

fn = func(name string) (*pbm.RestoreMeta, error) {
return pbm.GetPhysRestoreMeta(name, stg)
return pbm.GetPhysRestoreMeta(name, stg, cn.Logger().NewEvent(string(pbm.CmdRestore), bcpName, "", ep.TS()))
}
ctx, cancel = context.WithTimeout(context.Background(), waitPhysRestoreStart)
} else {
Expand Down Expand Up @@ -426,12 +427,13 @@ func describeRestore(cn *pbm.PBM, o descrRestoreOpts) (fmt.Stringer, error) {
return nil, errors.Wrap(err, "unable to unmarshal config file")
}

stg, err := pbm.Storage(cfg, log.New(nil, "cli", "").NewEvent("", "", "", primitive.Timestamp{}))
l := log.New(nil, "cli", "").NewEvent("", "", "", primitive.Timestamp{})
stg, err := pbm.Storage(cfg, l)
if err != nil {
return nil, errors.Wrap(err, "get storage")
}

meta, err = pbm.GetPhysRestoreMeta(o.restore, stg)
meta, err = pbm.GetPhysRestoreMeta(o.restore, stg, l)
if err != nil && meta == nil {
return nil, errors.Wrap(err, "get restore meta")
}
Expand Down
32 changes: 30 additions & 2 deletions e2e-tests/docker/docker-compose-rs.yaml
Expand Up @@ -15,6 +15,27 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
- ./conf:/etc/pbm
- ./backups:/opt/backups
agent-cli:
container_name: "pbmagent_cli"
user: "1001"
labels:
- "com.percona.pbm.app=agent"
- "com.percona.pbm.agent.rs=cli"
environment:
- "PBM_MONGODB_URI=mongodb://${BACKUP_USER:-bcp}:${MONGO_PASS:-test1234}@rs101:27017"
build:
labels:
- "com.percona.pbm.app=agent"
dockerfile: ./e2e-tests/docker/pbm-agent/Dockerfile
context: ../..
args:
- MONGODB_VERSION=${MONGODB_VERSION:-4.2}
- MONGODB_IMAGE=${MONGODB_IMAGE:-percona/percona-server-mongodb}
volumes:
- ./conf:/etc/pbm
- ./backups:/opt/backups
cap_add:
- NET_ADMIN

rs101:
build:
Expand All @@ -31,6 +52,7 @@ services:
- MONGO_USER=dba
- BACKUP_USER=bcp
- MONGO_PASS=test1234
- MONGODB_VERSION=${MONGODB_VERSION:-4.2}
command: mongod --replSet rs1 --port 27017 --storageEngine wiredTiger --keyFile /opt/keyFile --wiredTigerCacheSizeGB 1
volumes:
- data-rs101:/data/db
Expand Down Expand Up @@ -63,6 +85,8 @@ services:
- data-rs103:/data/db
agent-rs101:
container_name: "pbmagent_rs101"
ports:
- "6061:6060"
user: "1001"
labels:
- "com.percona.pbm.app=agent"
Expand All @@ -86,6 +110,8 @@ services:
- NET_ADMIN
agent-rs102:
container_name: "pbmagent_rs102"
ports:
- "6062:6060"
user: "1001"
labels:
- "com.percona.pbm.app=agent"
Expand All @@ -108,6 +134,8 @@ services:
- data-rs102:/data/db
agent-rs103:
container_name: "pbmagent_rs103"
ports:
- "6063:6060"
user: "1001"
labels:
- "com.percona.pbm.app=agent"
Expand All @@ -132,8 +160,8 @@ services:
minio:
image: minio/minio:RELEASE.2022-08-08T18-34-09Z
hostname: minio
# ports:
# - "9000:9000"
ports:
- "9001:9000"
volumes:
- backups:/backups
environment:
Expand Down
1 change: 1 addition & 0 deletions e2e-tests/docker/docker-compose-single.yaml
Expand Up @@ -32,6 +32,7 @@ services:
- BACKUP_USER=bcp
- MONGO_PASS=test1234
- SINGLE_NODE=true
- MONGODB_VERSION=${MONGODB_VERSION:-4.2}
command: mongod --replSet rs1 --port 27017 --storageEngine wiredTiger --keyFile /opt/keyFile --wiredTigerCacheSizeGB 1
volumes:
- data-rs101:/data/db
Expand Down
2 changes: 2 additions & 0 deletions e2e-tests/docker/docker-compose.yaml
Expand Up @@ -32,6 +32,7 @@ services:
context: ../..
args:
- MONGODB_VERSION=${MONGODB_VERSION:-4.2}
- MONGODB_IMAGE=${MONGODB_IMAGE:-percona/percona-server-mongodb}
volumes:
- ./conf:/etc/pbm
- ./backups:/opt/backups
Expand All @@ -54,6 +55,7 @@ services:
- MONGO_USER=dba
- BACKUP_USER=bcp
- MONGO_PASS=test1234
- MONGODB_VERSION=${MONGODB_VERSION:-4.2}
command: mongod --configsvr --dbpath /data/db --replSet cfg --bind_ip_all --port 27017 --keyFile /opt/keyFile --storageEngine wiredTiger --wiredTigerCacheSizeGB 1
volumes:
- ./scripts/start.sh:/opt/start.sh
Expand Down
13 changes: 12 additions & 1 deletion pbm/config.go
Expand Up @@ -122,8 +122,19 @@ func (s *StorageConf) Path() string {

// RestoreConf is config options for the restore
type RestoreConf struct {
BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"` // num of documents to buffer
// Logical restore
//
// num of documents to buffer
BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"`
NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"`

// NumDownloadWorkers sets the num of goroutine would be requesting chunks
// during the download. By default, it's set to GOMAXPROCS.
NumDownloadWorkers int `bson:"numDownloadWorkers" json:"numDownloadWorkers,omitempty" yaml:"numDownloadWorkers,omitempty"`
// MaxDownloadBufferMb sets the max size of the in-memory buffer that is used
// to download files from the storage.
MaxDownloadBufferMb int `bson:"maxDownloadBufferMb" json:"maxDownloadBufferMb,omitempty" yaml:"maxDownloadBufferMb,omitempty"`
DownloadChunkMb int `bson:"downloadChunkMb" json:"downloadChunkMb,omitempty" yaml:"downloadChunkMb,omitempty"`
}

type BackupConf struct {
Expand Down
6 changes: 6 additions & 0 deletions pbm/restore.go
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"time"

"github.com/percona/percona-backup-mongodb/pbm/storage/s3"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand All @@ -29,6 +30,11 @@ type RestoreMeta struct {
Conditions Conditions `bson:"conditions" json:"conditions"`
Type BackupType `bson:"type" json:"type"`
Leader string `bson:"l,omitempty" json:"l,omitempty"`
Stat *RestoreStat `bson:"stat,omitempty" json:"stat,omitempty"`
}

type RestoreStat struct {
Download map[string]map[string]s3.DownloadStat `bson:"download,omitempty" json:"download,omitempty"`
}

type RestoreReplset struct {
Expand Down
85 changes: 66 additions & 19 deletions pbm/restore/physical.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/compress"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/storage"
"github.com/percona/percona-backup-mongodb/pbm/storage/s3"
)

const (
Expand Down Expand Up @@ -64,12 +65,15 @@ type PhysRestore struct {
bcp *pbm.BackupMeta
files []files

confOpts pbm.RestoreConf

// path to files on a storage the node will sync its
// state with the rest of the cluster
syncPathNode string
syncPathRS string
syncPathCluster string
syncPathPeers map[string]struct{}
// state with the resto of the cluster
syncPathNode string
syncPathNodeStat string
syncPathRS string
syncPathCluster string
syncPathPeers map[string]struct{}
// Shards to participate in restore. Num of shards in bcp could
// be less than in the cluster and this is ok.
//
Expand Down Expand Up @@ -608,10 +612,14 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event,
progress |= restoreStared

l.Info("copying backup data")
err = r.copyFiles()
dstat, err := r.copyFiles()
if err != nil {
return errors.Wrap(err, "copy files")
}
err = r.writeStat(dstat)
if err != nil {
r.log.Warning("write download stat: %v", err)
}

l.Info("preparing data")
err = r.prepareData()
Expand Down Expand Up @@ -651,6 +659,25 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event,
return nil
}

func (r *PhysRestore) writeStat(stat any) error {
d := struct {
D any `json:"d"`
}{
D: stat,
}
b, err := json.Marshal(d)
if err != nil {
return errors.Wrap(err, "marshal")
}

err = r.stg.Save(r.syncPathNodeStat, bytes.NewBuffer(b), -1)
if err != nil {
return errors.Wrap(err, "write")
}

return nil
}

func (r *PhysRestore) dumpMeta(meta *pbm.RestoreMeta, s pbm.Status, msg string) error {
name := fmt.Sprintf("%s/%s.json", pbm.PhysRestoresDir, meta.Name)
_, err := r.stg.FileStat(name)
Expand All @@ -667,7 +694,7 @@ func (r *PhysRestore) dumpMeta(meta *pbm.RestoreMeta, s pbm.Status, msg string)
// The meta generated here is more for debugging porpuses (just in case).
// `pbm status` and `resync` will always rebuild it from agents' reports
// not relying solely on this file.
condsm, err := pbm.GetPhysRestoreMeta(meta.Name, r.stg)
condsm, err := pbm.GetPhysRestoreMeta(meta.Name, r.stg, r.log)
if err == nil {
meta.Replsets = condsm.Replsets
meta.Status = condsm.Status
Expand Down Expand Up @@ -699,7 +726,18 @@ func (r *PhysRestore) dumpMeta(meta *pbm.RestoreMeta, s pbm.Status, msg string)
return nil
}

func (r *PhysRestore) copyFiles() error {
func (r *PhysRestore) copyFiles() (stat *s3.DownloadStat, err error) {
readFn := r.stg.SourceReader
if t, ok := r.stg.(*s3.S3); ok {
d := t.NewDownload(r.confOpts.NumDownloadWorkers, r.confOpts.MaxDownloadBufferMb, r.confOpts.DownloadChunkMb)
readFn = d.SourceReader
defer func() {
s := d.Stat()
stat = &s
r.log.Debug("download stat: %s", s)
}()
}
cpbuf := make([]byte, 32*1024)
for i := len(r.files) - 1; i >= 0; i-- {
set := r.files[i]
for _, f := range set.Data {
Expand All @@ -711,46 +749,46 @@ func (r *PhysRestore) copyFiles() error {

err := os.MkdirAll(filepath.Dir(dst), os.ModeDir|0o700)
if err != nil {
return errors.Wrapf(err, "create path %s", filepath.Dir(dst))
return stat, errors.Wrapf(err, "create path %s", filepath.Dir(dst))
}

r.log.Info("copy <%s> to <%s>", src, dst)
sr, err := r.stg.SourceReader(src)
sr, err := readFn(src)
if err != nil {
return errors.Wrapf(err, "create source reader for <%s>", src)
return stat, errors.Wrapf(err, "create source reader for <%s>", src)
}
defer sr.Close()

data, err := compress.Decompress(sr, set.Cmpr)
if err != nil {
return errors.Wrapf(err, "decompress object %s", src)
return stat, errors.Wrapf(err, "decompress object %s", src)
}
defer data.Close()

fw, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE, f.Fmode)
if err != nil {
return errors.Wrapf(err, "create/open destination file <%s>", dst)
return stat, errors.Wrapf(err, "create/open destination file <%s>", dst)
}
defer fw.Close()
if f.Off != 0 {
_, err := fw.Seek(f.Off, io.SeekStart)
if err != nil {
return errors.Wrapf(err, "set file offset <%s>|%d", dst, f.Off)
return stat, errors.Wrapf(err, "set file offset <%s>|%d", dst, f.Off)
}
}
_, err = io.Copy(fw, data)
_, err = io.CopyBuffer(fw, data, cpbuf)
if err != nil {
return errors.Wrapf(err, "copy file <%s>", dst)
return stat, errors.Wrapf(err, "copy file <%s>", dst)
}
if f.Size != 0 {
err = fw.Truncate(f.Size)
if err != nil {
return errors.Wrapf(err, "truncate file <%s>|%d", dst, f.Size)
return stat, errors.Wrapf(err, "truncate file <%s>|%d", dst, f.Size)
}
}
}
}
return nil
return stat, nil
}

func (r *PhysRestore) prepareData() error {
Expand Down Expand Up @@ -1035,11 +1073,19 @@ func (r *PhysRestore) startMongo(opts ...string) error {
const hbFrameSec = 60 * 2

func (r *PhysRestore) init(name string, opid pbm.OPID, l *log.Event) (err error) {
r.stg, err = r.cn.GetStorage(l)
var cfg pbm.Config
cfg, err = r.cn.GetConfig()
if err != nil {
return errors.Wrap(err, "get pbm config")
}

r.stg, err = pbm.Storage(cfg, l)
if err != nil {
return errors.Wrap(err, "get storage")
}

r.confOpts = cfg.Restore

r.log = l

r.name = name
Expand All @@ -1048,6 +1094,7 @@ func (r *PhysRestore) init(name string, opid pbm.OPID, l *log.Event) (err error)
r.startTS = time.Now().Unix()

r.syncPathNode = fmt.Sprintf("%s/%s/rs.%s/node.%s", pbm.PhysRestoresDir, r.name, r.rsConf.ID, r.nodeInfo.Me)
r.syncPathNodeStat = fmt.Sprintf("%s/%s/rs.%s/stat.%s", pbm.PhysRestoresDir, r.name, r.rsConf.ID, r.nodeInfo.Me)
r.syncPathRS = fmt.Sprintf("%s/%s/rs.%s/rs", pbm.PhysRestoresDir, r.name, r.rsConf.ID)
r.syncPathCluster = fmt.Sprintf("%s/%s/cluster", pbm.PhysRestoresDir, r.name)
r.syncPathPeers = make(map[string]struct{})
Expand Down

0 comments on commit 85b566e

Please sign in to comment.