Skip to content

Commit

Permalink
fix: 避免仅同步一个region时,其余region状态变为准备中
Browse files Browse the repository at this point in the history
  • Loading branch information
Qu Xuan committed May 18, 2020
1 parent bbeb6be commit 33c2b5f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pkg/compute/models/cloudaccounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,13 @@ func (self *SCloudaccount) StartSyncCloudProviderInfoTask(ctx context.Context, u
log.Errorf("CloudAccountSyncInfoTask newTask error %s", err)
return err
}
self.markStartSync(userCred)
self.markStartSync(userCred, syncRange)
db.OpsLog.LogEvent(self, db.ACT_SYNC_HOST_START, "", userCred)
task.ScheduleRun(nil)
return nil
}

func (self *SCloudaccount) markStartSync(userCred mcclient.TokenCredential) error {
func (self *SCloudaccount) markStartSync(userCred mcclient.TokenCredential, syncRange *SSyncRange) error {
_, err := db.Update(self, func() error {
self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_QUEUED
return nil
Expand All @@ -705,7 +705,7 @@ func (self *SCloudaccount) markStartSync(userCred mcclient.TokenCredential) erro
providers := self.GetCloudproviders()
for i := range providers {
if providers[i].GetEnabled() {
err := providers[i].markStartingSync(userCred)
err := providers[i].markStartingSync(userCred, syncRange)
if err != nil {
return errors.Wrap(err, "providers.markStartSync")
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/compute/models/cloudproviderregions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/compare"
"yunion.io/x/pkg/util/timeutils"
"yunion.io/x/pkg/utils"
"yunion.io/x/sqlchemy"

api "yunion.io/x/onecloud/pkg/apis/compute"
Expand Down Expand Up @@ -260,17 +261,23 @@ func (manager *SCloudproviderregionManager) FetchByIdsOrCreate(providerId string
return cpr
}

func (self *SCloudproviderregion) markStartingSync(userCred mcclient.TokenCredential) error {
func (self *SCloudproviderregion) markStartingSync(userCred mcclient.TokenCredential, syncRange *SSyncRange) error {
if !self.Enabled {
return fmt.Errorf("Cloudprovider(%s)region(%s) disabled", self.CloudproviderId, self.CloudregionId)
}
_, err := db.Update(self, func() error {
self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_QUEUING
return nil
})
if err != nil {
log.Errorf("Failed to markStartingSync error: %v", err)
return err
regionIds := []string{}
if syncRange != nil {
regionIds, _ = syncRange.GetRegionIds()
}
if syncRange == nil || len(regionIds) == 0 || utils.IsInStringArray(self.CloudregionId, regionIds) {
_, err := db.Update(self, func() error {
self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_QUEUING
return nil
})
if err != nil {
log.Errorf("Failed to markStartingSync error: %v", err)
return err
}
}
return nil
}
Expand Down
51 changes: 45 additions & 6 deletions pkg/compute/models/cloudproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,44 @@ type SSyncRange struct {
Host []string
}

func (sr *SSyncRange) GetRegionIds() ([]string, error) {
regionIds := []string{}
if len(sr.Host) == 0 && len(sr.Zone) == 0 && len(sr.Region) == 0 {
return regionIds, nil
}
hostQ := HostManager.Query().SubQuery()
hosts := hostQ.Query().Filter(sqlchemy.OR(
sqlchemy.In(hostQ.Field("id"), sr.Host),
sqlchemy.In(hostQ.Field("name"), sr.Host),
)).SubQuery()
zoneQ := ZoneManager.Query().SubQuery()
zones := zoneQ.Query().Filter(sqlchemy.OR(
sqlchemy.In(zoneQ.Field("id"), sr.Zone),
sqlchemy.In(zoneQ.Field("name"), sr.Zone),
sqlchemy.In(zoneQ.Field("id"), hosts.Query(hosts.Field("zone_id")).SubQuery()),
)).SubQuery()
regionQ := CloudregionManager.Query().SubQuery()
q := regionQ.Query(regionQ.Field("id")).Filter(sqlchemy.OR(
sqlchemy.In(regionQ.Field("id"), sr.Region),
sqlchemy.In(regionQ.Field("name"), sr.Region),
sqlchemy.In(regionQ.Field("id"), zones.Query(zones.Field("cloudregion_id")).SubQuery()),
))
rows, err := q.Rows()
if err != nil {
return nil, errors.Wrap(err, "q.Rows")
}
defer rows.Close()
for rows.Next() {
var regionId string
err = rows.Scan(&regionId)
if err != nil {
return nil, errors.Wrap(err, "rows.Scan")
}
regionIds = append(regionIds, regionId)
}
return regionIds, nil
}

func (sr *SSyncRange) NeedSyncInfo() bool {
if sr.FullSync {
return true
Expand Down Expand Up @@ -563,7 +601,7 @@ func (self *SCloudprovider) StartSyncCloudProviderInfoTask(ctx context.Context,
cloudaccount.markAutoSync(userCred)
cloudaccount.MarkSyncing(userCred)
}
self.markStartSync(userCred)
self.markStartSync(userCred, syncRange)
db.OpsLog.LogEvent(self, db.ACT_SYNC_HOST_START, "", userCred)
task.ScheduleRun(nil)
return nil
Expand Down Expand Up @@ -634,7 +672,7 @@ func (self *SCloudprovider) PerformChangeProject(ctx context.Context, userCred m
return nil, self.StartSyncCloudProviderInfoTask(ctx, userCred, &SSyncRange{FullSync: true, DeepSync: true}, "")
}

func (self *SCloudprovider) markStartingSync(userCred mcclient.TokenCredential) error {
func (self *SCloudprovider) markStartingSync(userCred mcclient.TokenCredential, syncRange *SSyncRange) error {
_, err := db.Update(self, func() error {
self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_QUEUING
return nil
Expand All @@ -646,7 +684,7 @@ func (self *SCloudprovider) markStartingSync(userCred mcclient.TokenCredential)
cprs := self.GetCloudproviderRegions()
for i := range cprs {
if cprs[i].Enabled {
err := cprs[i].markStartingSync(userCred)
err := cprs[i].markStartingSync(userCred, syncRange)
if err != nil {
return errors.Wrap(err, "cprs[i].markStartingSync")
}
Expand All @@ -655,7 +693,7 @@ func (self *SCloudprovider) markStartingSync(userCred mcclient.TokenCredential)
return nil
}

func (self *SCloudprovider) markStartSync(userCred mcclient.TokenCredential) error {
func (self *SCloudprovider) markStartSync(userCred mcclient.TokenCredential, syncRange *SSyncRange) error {
_, err := db.Update(self, func() error {
self.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_QUEUED
return nil
Expand All @@ -667,7 +705,7 @@ func (self *SCloudprovider) markStartSync(userCred mcclient.TokenCredential) err
cprs := self.GetCloudproviderRegions()
for i := range cprs {
if cprs[i].Enabled {
err := cprs[i].markStartingSync(userCred)
err := cprs[i].markStartingSync(userCred, syncRange)
if err != nil {
return errors.Wrap(err, "cprs[i].markStartingSync")
}
Expand Down Expand Up @@ -1242,9 +1280,10 @@ func (provider *SCloudprovider) resetAutoSync() {
func (provider *SCloudprovider) syncCloudproviderRegions(ctx context.Context, userCred mcclient.TokenCredential, syncRange SSyncRange, wg *sync.WaitGroup, autoSync bool) {
provider.markSyncing(userCred)
cprs := provider.GetCloudproviderRegions()
regionIds, _ := syncRange.GetRegionIds()
syncCnt := 0
for i := range cprs {
if cprs[i].Enabled && cprs[i].CanSync() && (!autoSync || cprs[i].needAutoSync()) {
if cprs[i].Enabled && cprs[i].CanSync() && (!autoSync || cprs[i].needAutoSync()) && (len(regionIds) == 0 || utils.IsInStringArray(cprs[i].CloudregionId, regionIds)) {
syncCnt += 1
var waitChan chan bool = nil
if wg != nil {
Expand Down

0 comments on commit 33c2b5f

Please sign in to comment.