Skip to content

Commit

Permalink
fix: rds备份恢复优化
Browse files Browse the repository at this point in the history
  • Loading branch information
Qu Xuan committed Jul 18, 2020
1 parent 9b1ea9f commit ecb7249
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 46 deletions.
12 changes: 9 additions & 3 deletions pkg/apis/compute/dbinstance.go
Expand Up @@ -178,9 +178,15 @@ type SDBInstanceChangeConfigInput struct {
type SDBInstanceRecoveryConfigInput struct {
apis.Meta

DBInstancebackup string
DBInstancebackupId string `json:"dbinstancebackup_id"`
Databases map[string]string `json:"databases,allowempty"`
// swagger:ignore
DBInstancebackup string `json:"dbinstancebackup" "yunion:deprecated-by":"dbinstancebackup_id"`

// 备份Id
DBInstancebackupId string `json:"dbinstancebackup_id"`

// 数据库信息, 例如 {"src":"dest"} 是将备份中的src数据库恢复到目标实例的dest数据库中
// example: {"sdb1":"ddb1"}
Databases map[string]string `json:"databases,allowempty"`
}

type DBInstanceListInput struct {
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/compute/geo_input.go
Expand Up @@ -64,6 +64,23 @@ type ZonalFilterListBase struct {
OrderByZone string `json:"order_by_zone"`
}

func (input ZonalFilterListBase) ZoneList() []string {
zoneStr := input.Zone
if len(zoneStr) > 0 {
input.Zones = append(input.Zones, zoneStr)
}
return input.Zones
}

func (input ZonalFilterListBase) FirstZone() string {
if len(input.Zone) > 0 {
return input.Zone
}
if len(input.Zones) > 0 {
return input.Zones[0]
}
return ""
}
func (input ZonalFilterListInput) ZoneList() []string {
zoneStr := input.Zone
if len(zoneStr) > 0 {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cloudprovider/dbinstance.go
Expand Up @@ -88,6 +88,7 @@ type SDBInstanceBackupCreateConfig struct {
}

type SDBInstanceRecoveryConfig struct {
BackupId string
Databases map[string]string
BackupId string
Databases map[string]string
OriginDBInstanceExternalId string
}
33 changes: 22 additions & 11 deletions pkg/compute/models/dbinstances.go
Expand Up @@ -687,22 +687,19 @@ func (self *SDBInstance) AllowPerformRecovery(ctx context.Context, userCred mccl
return self.IsOwner(userCred) || db.IsAdminAllowPerform(userCred, self, "recovery")
}

func (self *SDBInstance) PerformRecovery(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
func (self *SDBInstance) PerformRecovery(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.SDBInstanceRecoveryConfigInput) (jsonutils.JSONObject, error) {
if !utils.IsInStringArray(self.Status, []string{api.DBINSTANCE_RUNNING}) {
return nil, httperrors.NewInvalidStatusError("Cannot do recovery dbinstance in status %s required status %s", self.Status, api.DBINSTANCE_RUNNING)
}

params := data.(*jsonutils.JSONDict)
backupV := validators.NewModelIdOrNameValidator("dbinstancebackup", "dbinstancebackup", userCred)
err := backupV.Validate(params)
if err != nil {
return nil, err
}
input := &api.SDBInstanceRecoveryConfigInput{}
err = params.Unmarshal(input)
_backup, err := DBInstanceBackupManager.FetchByIdOrName(userCred, input.DBInstancebackupId)
if err != nil {
return nil, httperrors.NewInputParameterError("Failed to unmarshal input config: %v", err)
if errors.Cause(err) == sql.ErrNoRows {
return nil, httperrors.NewResourceNotFoundError2("dbinstancebackup", input.DBInstancebackupId)
}
return nil, httperrors.NewGeneralError(err)
}
input.DBInstancebackupId = _backup.GetId()

databases, err := self.GetDBInstanceDatabases()
if err != nil {
Expand All @@ -714,7 +711,7 @@ func (self *SDBInstance) PerformRecovery(ctx context.Context, userCred mcclient.
dbDatabases = append(dbDatabases, database.Name)
}

backup := backupV.Model.(*SDBInstanceBackup)
backup := _backup.(*SDBInstanceBackup)
for src, dest := range input.Databases {
if len(dest) == 0 {
dest = src
Expand All @@ -737,6 +734,20 @@ func (self *SDBInstance) PerformRecovery(ctx context.Context, userCred mcclient.
return nil, httperrors.NewInputParameterError("backup and instance not in same cloudregion")
}

if len(backup.Engine) > 0 && backup.Engine != self.Engine {
return nil, httperrors.NewInputParameterError("can not recover data from diff rds engine")
}

driver, err := self.GetRegionDriver()
if err != nil {
return nil, httperrors.NewGeneralError(err)
}

err = driver.ValidateDBInstanceRecovery(ctx, userCred, self, backup, input)
if err != nil {
return nil, err
}

return nil, self.StartDBInstanceRecoveryTask(ctx, userCred, input.JSON(input), "")
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/compute/models/regiondrivers.go
Expand Up @@ -162,6 +162,8 @@ type IDBInstanceDriver interface {

InitDBInstanceUser(ctx context.Context, dbinstance *SDBInstance, task taskman.ITask, desc *cloudprovider.SManagedDBInstanceCreateConfig) error
IsDBInstanceNeedSecgroup() bool

ValidateDBInstanceRecovery(ctx context.Context, userCred mcclient.TokenCredential, instance *SDBInstance, backup *SDBInstanceBackup, input api.SDBInstanceRecoveryConfigInput) error
}

type IElasticcacheDriver interface {
Expand Down
37 changes: 18 additions & 19 deletions pkg/compute/models/wireresource.go
Expand Up @@ -174,26 +174,25 @@ func (manager *SWireResourceBaseManager) ListItemFilter(
return nil, errors.Wrap(err, "SVpcResourceBaseManager.ListItemFilter")
}

if len(query.Zone) > 0 || len(query.Zones) > 0 {
if len(query.ZoneList()) > 0 {
region := &SCloudregion{}
if len(query.Zone) > 0 {
sq := ZoneManager.Query().SubQuery()
q := CloudregionManager.Query()
q = q.Join(sq, sqlchemy.Equals(sq.Field("cloudregion_id"), q.Field("id"))).Filter(sqlchemy.OR(
sqlchemy.Equals(sq.Field("id"), query.Zone),
sqlchemy.Equals(sq.Field("name"), query.Zone),
))
count, err := q.CountWithError()
if err != nil {
return nil, errors.Wrap(err, "CountWithError")
}
if count < 1 {
return nil, httperrors.NewResourceNotFoundError2("zone", query.Zone)
}
err = q.First(region)
if err != nil {
return nil, errors.Wrap(err, "q.First")
}
firstZone := query.FirstZone()
sq := ZoneManager.Query().SubQuery()
q := CloudregionManager.Query()
q = q.Join(sq, sqlchemy.Equals(sq.Field("cloudregion_id"), q.Field("id"))).Filter(sqlchemy.OR(
sqlchemy.Equals(sq.Field("id"), firstZone),
sqlchemy.Equals(sq.Field("name"), firstZone),
))
count, err := q.CountWithError()
if err != nil {
return nil, errors.Wrap(err, "CountWithError")
}
if count < 1 {
return nil, httperrors.NewResourceNotFoundError2("zone", firstZone)
}
err = q.First(region)
if err != nil {
return nil, errors.Wrap(err, "q.First")
}
if utils.IsInStringArray(region.Provider, api.REGIONAL_NETWORK_PROVIDERS) {
vpcQ := VpcManager.Query().SubQuery()
Expand Down
4 changes: 4 additions & 0 deletions pkg/compute/regiondrivers/aliyun.go
Expand Up @@ -975,6 +975,10 @@ func (self *SAliyunRegionDriver) IsSecurityGroupBelongVpc() bool {
return true
}

func (self *SAliyunRegionDriver) ValidateDBInstanceRecovery(ctx context.Context, userCred mcclient.TokenCredential, instance *models.SDBInstance, backup *models.SDBInstanceBackup, input api.SDBInstanceRecoveryConfigInput) error {
return nil
}

func (self *SAliyunRegionDriver) ValidateCreateDBInstanceData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, input api.DBInstanceCreateInput, skus []models.SDBInstanceSku, network *models.SNetwork) (api.DBInstanceCreateInput, error) {
if input.BillingType == billing_api.BILLING_TYPE_PREPAID && len(input.MasterInstanceId) > 0 {
return input, httperrors.NewInputParameterError("slave dbinstance not support prepaid billing type")
Expand Down
5 changes: 5 additions & 0 deletions pkg/compute/regiondrivers/base.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/secrules"

api "yunion.io/x/onecloud/pkg/apis/compute"
Expand Down Expand Up @@ -336,6 +337,10 @@ func (self *SBaseRegionDriver) IsSupportKeepDBInstanceManualBackup() bool {
return false
}

func (self *SBaseRegionDriver) ValidateDBInstanceRecovery(ctx context.Context, userCred mcclient.TokenCredential, instance *models.SDBInstance, backup *models.SDBInstanceBackup, input api.SDBInstanceRecoveryConfigInput) error {
return errors.Wrap(cloudprovider.ErrNotImplemented, "ValidateDBInstanceRecovery")
}

func (self *SBaseRegionDriver) IsSupportedDBInstance() bool {
return false
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/compute/regiondrivers/google.go
Expand Up @@ -184,6 +184,10 @@ func (self *SGoogleRegionDriver) IsSupportedDBInstance() bool {
return true
}

func (self *SGoogleRegionDriver) ValidateDBInstanceRecovery(ctx context.Context, userCred mcclient.TokenCredential, instance *models.SDBInstance, backup *models.SDBInstanceBackup, input api.SDBInstanceRecoveryConfigInput) error {
return nil
}

func (self *SGoogleRegionDriver) ValidateCreateDBInstanceData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, input api.DBInstanceCreateInput, skus []models.SDBInstanceSku, network *models.SNetwork) (api.DBInstanceCreateInput, error) {
if input.BillingType == billing_api.BILLING_TYPE_PREPAID {
return input, httperrors.NewInputParameterError("Google dbinstance not support prepaid billing type")
Expand Down
22 changes: 22 additions & 0 deletions pkg/compute/regiondrivers/huawei.go
Expand Up @@ -2275,6 +2275,28 @@ func (self *SHuaWeiRegionDriver) ValidateDBInstanceAccountPrivilege(ctx context.
return nil
}

// https://support.huaweicloud.com/api-rds/rds_09_0009.html
func (self *SHuaWeiRegionDriver) ValidateDBInstanceRecovery(ctx context.Context, userCred mcclient.TokenCredential, instance *models.SDBInstance, backup *models.SDBInstanceBackup, input api.SDBInstanceRecoveryConfigInput) error {
if backup.Engine == api.DBINSTANCE_TYPE_POSTGRESQL {
return httperrors.NewNotSupportedError("%s not support recovery", backup.Engine)
}
if backup.DBInstanceId == instance.Id && instance.Engine != api.DBINSTANCE_TYPE_SQLSERVER {
return httperrors.NewNotSupportedError("Huawei %s rds not support recovery from it self rds backup", instance.Engine)
}
if len(input.Databases) > 0 {
if instance.Engine != api.DBINSTANCE_TYPE_SQLSERVER {
return httperrors.NewInputParameterError("Huawei only %s engine support databases recovery", instance.Engine)
}
invalidDbs := []string{"rdsadmin", "master", "msdb", "tempdb", "model"}
for _, db := range input.Databases {
if utils.IsInStringArray(strings.ToLower(db), invalidDbs) {
return httperrors.NewInputParameterError("New databases name can not be one of %s", invalidDbs)
}
}
}
return nil
}

func validatorSlaveZones(ownerId mcclient.IIdentityProvider, data *jsonutils.JSONDict, optional bool) error {
s, err := data.GetString("slave_zones")
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/compute/tasks/dbinstance_recovery_task.go
Expand Up @@ -60,29 +60,37 @@ func (self *DBInstanceRecoveryTask) OnInit(ctx context.Context, obj db.IStandalo
return
}

backup, err := instance.GetDBInstanceBackup(input.DBInstancebackupId)
_backup, err := models.DBInstanceBackupManager.FetchById(input.DBInstancebackupId)
if err != nil {
self.taskFailed(ctx, instance, errors.Wrapf(err, "instance.GetDBInstanceBackup(%s)", input.DBInstancebackupId))
return
}

backup := _backup.(*models.SDBInstanceBackup)
origin, _ := backup.GetDBInstance()

conf := &cloudprovider.SDBInstanceRecoveryConfig{
BackupId: backup.ExternalId,
Databases: input.Databases,
}
if origin != nil {
conf.OriginDBInstanceExternalId = origin.ExternalId
}

err = iRds.RecoveryFromBackup(conf)
if err != nil {
self.taskFailed(ctx, instance, errors.Wrap(err, "iRds.RecoveryFromBackup"))
return
}

err = cloudprovider.WaitStatus(iRds, api.DBINSTANCE_RUNNING, time.Second*10, time.Second*40)
err = cloudprovider.WaitStatus(iRds, api.DBINSTANCE_RUNNING, time.Second*10, time.Minute*40)
if err != nil {
self.taskFailed(ctx, instance, errors.Wrap(err, "cloudprovider.WaitStatus(running)"))
return
}

db.OpsLog.LogEvent(instance, db.ACT_RESTORE, nil, self.GetUserCred())
logclient.AddActionLogWithStartable(self, instance, logclient.ACT_RESTORE, backup, self.UserCred, true)
instance.SetStatus(self.UserCred, api.DBINSTANCE_RUNNING, "")
self.SetStageComplete(ctx, nil)
}
26 changes: 17 additions & 9 deletions pkg/multicloud/huawei/dbinstance.go
Expand Up @@ -717,19 +717,27 @@ func (region *SRegion) ChangeDBInstanceConfig(instanceId string, instanceType st
}

func (rds *SDBInstance) RecoveryFromBackup(conf *cloudprovider.SDBInstanceRecoveryConfig) error {
return rds.region.RecoveryDBInstanceFromBackup(rds.Id, conf.BackupId, conf.Databases)
if len(conf.OriginDBInstanceExternalId) == 0 {
conf.OriginDBInstanceExternalId = rds.Id
}
return rds.region.RecoveryDBInstanceFromBackup(rds.Id, conf.OriginDBInstanceExternalId, conf.BackupId, conf.Databases)
}

func (region *SRegion) RecoveryDBInstanceFromBackup(instanceId string, backupId string, databases map[string]string) error {
func (region *SRegion) RecoveryDBInstanceFromBackup(target, origin string, backupId string, databases map[string]string) error {
source := map[string]interface{}{
"type": "backup",
"backup_id": backupId,
}
if len(origin) > 0 {
source["instance_id"] = origin
}
if len(databases) > 0 {
source["database_name"] = databases
}
params := map[string]interface{}{
"source": map[string]interface{}{
"instance_id": instanceId,
"type": "backup",
"backup_id": backupId,
"database_name": databases,
},
"source": source,
"target": map[string]string{
"instance_id": instanceId,
"instance_id": target,
},
}
_, err := region.ecsClient.DBInstance.PerformAction("", "recovery", jsonutils.Marshal(params))
Expand Down

0 comments on commit ecb7249

Please sign in to comment.