Skip to content

Commit

Permalink
S3 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
percona-csalguero committed Dec 20, 2018
1 parent 77118d4 commit 3a797b2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
6 changes: 3 additions & 3 deletions cli/pbm-agent/run-agents.sh
Expand Up @@ -40,9 +40,9 @@ run_agents() {
do
pidfile=/tmp/agent.${port}.pid
logfile=/tmp/agent.${port}.log
backupdir=/tmp/backup.${port}
rm -rf ${backupdir}
mkdir -p ${backupdir}
backupdir=percona-backup-mongodb-test-s3-streamer
#rm -rf ${backupdir}
#mkdir -p ${backupdir}

echo "./pbm-agent --mongodb-user=${TEST_MONGODB_USERNAME} \\"
echo " --mongodb-password=${TEST_MONGODB_PASSWORD} \\"
Expand Down
39 changes: 31 additions & 8 deletions grpc/client/client.go
Expand Up @@ -725,6 +725,7 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {

var sess *session.Session
var err error

switch msg.GetDestinationType() {
case pb.DestinationType_DESTINATION_TYPE_FILE:
fi, err := os.Stat(c.backupDir)
Expand Down Expand Up @@ -766,7 +767,7 @@ func (c *Client) processStartBackup(msg *pb.StartBackup) {
}

log.Debug("Starting oplog backup")
go c.runOplogBackup(msg)
go c.runOplogBackup(msg, sess)
// Wait until we have at least one document from the tailer to start the backup only after we have
// documents in the oplog tailer.
log.Debug("Waiting oplog first doc")
Expand Down Expand Up @@ -928,20 +929,20 @@ func (c *Client) runDBBackup(msg *pb.StartBackup, sess *session.Session) {
writers = append(writers, fw)
case pb.DestinationType_DESTINATION_TYPE_AWS:
svc := s3.New(sess)
exists, err := awsutils.BucketExists(svc, msg.GetDestinationDir())
exists, err := awsutils.BucketExists(svc, c.backupDir)
if err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot check if S3 bucket %q exists: %s", msg.GetDestinationDir(), err))
c.sendDBBackupFinishError(fmt.Errorf("cannot check if S3 bucket %q exists: %s", c.backupDir, err))
return
}
if !exists {
if err := awsutils.CreateBucket(svc, msg.GetDestinationDir()); err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot create s3 bucket %q: %s", msg.GetDestinationDir(), err))
c.sendDBBackupFinishError(fmt.Errorf("cannot create s3 bucket %q for dbBackup: %s", c.backupDir, err))
return
}
}
s3w, err := s3writer.Open(sess, msg.GetDestinationDir(), msg.GetDbBackupName())
s3w, err := s3writer.Open(sess, c.backupDir, msg.GetDbBackupName())
if err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot create s3 write for file %q: %s", msg.GetDbBackupName(), err))
c.sendDBBackupFinishError(fmt.Errorf("cannot create s3 write for file %q for dbBackup: %s", msg.GetDbBackupName(), err))
return
}
writers = append(writers, s3w)
Expand Down Expand Up @@ -1005,7 +1006,7 @@ func (c *Client) runDBBackup(msg *pb.StartBackup, sess *session.Session) {
return
}

c.setDBBackupRunning(false)
c.setOplogBackupRunning(false)

if dumpErr != nil {
c.sendDBBackupFinishError(fmt.Errorf("backup was cancelled"))
Expand All @@ -1017,10 +1018,12 @@ func (c *Client) runDBBackup(msg *pb.StartBackup, sess *session.Session) {
c.sendBackupFinishOK()
}

func (c *Client) runOplogBackup(msg *pb.StartBackup) {
func (c *Client) runOplogBackup(msg *pb.StartBackup, sess *session.Session) {
c.logger.Info("Starting oplog backup")
writers := []io.WriteCloser{}

log.Debugf("destination type: %v\n", msg.GetDestinationType())
log.Printf("destination type: %v\n", msg.GetDestinationType())
switch msg.GetDestinationType() {
case pb.DestinationType_DESTINATION_TYPE_FILE:
fw, err := os.Create(path.Join(c.backupDir, msg.GetOplogBackupName()))
Expand All @@ -1035,6 +1038,26 @@ func (c *Client) runOplogBackup(msg *pb.StartBackup) {
c.grpcClient.OplogBackupFinished(context.Background(), finishMsg)
}
writers = append(writers, fw)
case pb.DestinationType_DESTINATION_TYPE_AWS:
svc := s3.New(sess)
exists, err := awsutils.BucketExists(svc, c.backupDir)
if err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot check if S3 bucket %q exists: %s", c.backupDir, err))
return
}
if !exists {
if err := awsutils.CreateBucket(svc, c.backupDir); err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot create s3 bucket %q for oplogBackup: %s", c.backupDir, err))
return
}
}
log.Infof("oplog %v %v \n", c.backupDir, msg.GetOplogBackupName())
s3w, err := s3writer.Open(sess, c.backupDir, msg.GetOplogBackupName())
if err != nil {
c.sendDBBackupFinishError(fmt.Errorf("cannot create s3 write for file %q for oplogBackup: %s", msg.GetOplogBackupName(), err))
return
}
writers = append(writers, s3w)
}

switch msg.GetCypher() {
Expand Down
1 change: 1 addition & 0 deletions setenv.sh
Expand Up @@ -7,6 +7,7 @@ export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export GOCACHE=off
export GOLANG_DOCKERHUB_TAG=1.10-stretch
export AWS_REGION=us-west-2

export TEST_PSMDB_VERSION=latest
export TEST_MONGODB_ADMIN_USERNAME=admin
Expand Down

0 comments on commit 3a797b2

Please sign in to comment.