Skip to content

Commit

Permalink
fix: 修复cloudid同步问题
Browse files Browse the repository at this point in the history
  • Loading branch information
Qu Xuan committed Jul 3, 2020
1 parent 7e30803 commit 544c19f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 162 deletions.
96 changes: 9 additions & 87 deletions pkg/cloudid/models/cloudaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"database/sql"
"net/http"
"net/url"
"time"

"golang.org/x/net/http/httpproxy"

Expand Down Expand Up @@ -61,7 +60,6 @@ func init() {
),
}
CloudaccountManager.SetVirtualObject(CloudaccountManager)
isCloudacountSynced = false
}

type SCloudaccount struct {
Expand Down Expand Up @@ -195,9 +193,9 @@ func (self *SCloudaccount) removeCloudgroupcaches(ctx context.Context, userCred
return errors.Wrap(err, "GetCloudgroupcaches")
}
for i := range caches {
err = caches[i].Delete(ctx, userCred)
err = caches[i].RealDelete(ctx, userCred)
if err != nil {
return errors.Wrap(err, "caches[i].Delete")
return errors.Wrap(err, "caches[i].RealDelete")
}
}
return nil
Expand All @@ -219,7 +217,7 @@ func (self *SCloudaccount) syncRemoveCloudaccount(ctx context.Context, userCred
return errors.Wrap(err, "removeCloudgroupcaches")
}

return nil
return self.Delete(ctx, userCred)
}

func (self *SCloudaccount) syncRemoveClouduser(ctx context.Context, userCred mcclient.TokenCredential) error {
Expand All @@ -233,7 +231,7 @@ func (self *SCloudaccount) syncRemoveClouduser(ctx context.Context, userCred mcc
return errors.Wrapf(err, "RealDelete user %s(%s)", users[i].Name, users[i].Id)
}
}
return self.Delete(ctx, userCred)
return nil
}

func (manager *SCloudaccountManager) newFromICloudaccount(ctx context.Context, userCred mcclient.TokenCredential, account *SCloudaccount) (*SCloudaccount, error) {
Expand Down Expand Up @@ -282,15 +280,6 @@ func (manager *SCloudaccountManager) SyncCloudaccounts(ctx context.Context, user
result = account.syncCloudprovider(ctx, userCred)
log.Infof("sync cloudprovider for cloudaccount %s(%s) result: %s", account.Name, account.Id, result.Result())
}
isCloudacountSynced = true
}

// 避免第一次启动时,云账号列表为空,子账号及其他资源需要等待一个周期才能同步
func waitForSync(task string) {
for isCloudacountSynced == false {
log.Debugf("cloudaccount not sync try later do task %s", task)
time.Sleep(time.Second * 30)
}
}

func (self SCloudaccount) GetGlobalId() string {
Expand Down Expand Up @@ -425,27 +414,6 @@ func (account *SCloudDelegate) GetProvider() (cloudprovider.ICloudProvider, erro
})
}

func (manager *SCloudaccountManager) SyncCloudusers(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
waitForSync("SyncCloudusersTask")
accounts, err := manager.GetCloudaccounts()
if err != nil {
log.Errorf("GetLocalCloudaccounts: %v", err)
return
}
for i := range accounts {
factory, err := accounts[i].GetProviderFactory()
if err != nil {
continue
}
if factory.IsSupportClouduser() {
err = accounts[i].StartSyncCloudusersTask(ctx, userCred, "")
if err != nil {
log.Errorf("StartSyncCloudusersTask for account %s(%s) error: %v", accounts[i].Name, accounts[i].Provider, err)
}
}
}
}

func (self *SCloudaccount) StartSyncCloudusersTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
params := jsonutils.NewDict()
task, err := taskman.TaskManager.NewTask(ctx, "SyncCloudusersTask", self, userCred, params, parentTaskId, "", nil)
Expand All @@ -456,26 +424,6 @@ func (self *SCloudaccount) StartSyncCloudusersTask(ctx context.Context, userCred
return nil
}

func (self *SCloudaccount) StartSyncCloudgroupsTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
params := jsonutils.NewDict()
task, err := taskman.TaskManager.NewTask(ctx, "SyncCloudgroupsTask", self, userCred, params, parentTaskId, "", nil)
if err != nil {
return errors.Wrap(err, "NewTask")
}
task.ScheduleRun(nil)
return nil
}

func (self *SCloudaccount) StartSyncCloudpoliciesTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
params := jsonutils.NewDict()
task, err := taskman.TaskManager.NewTask(ctx, "SyncCloudpoliciesTask", self, userCred, params, parentTaskId, "", nil)
if err != nil {
return errors.Wrap(err, "NewTask")
}
task.ScheduleRun(nil)
return nil
}

func (self *SCloudaccount) getCloudusers() ([]SClouduser, error) {
users := []SClouduser{}
q := ClouduserManager.Query().Equals("cloudaccount_id", self.Id)
Expand Down Expand Up @@ -728,49 +676,23 @@ func (manager *SCloudaccountManager) GetSupportCreateCloudgroupAccounts() ([]SCl
return accounts, nil
}

func (manager *SCloudaccountManager) SyncCloudpolicies(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
waitForSync("SyncCloudpoliciesTask")
func (manager *SCloudaccountManager) SyncCloudidResources(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
accounts, err := manager.GetCloudaccounts()
if err != nil {
log.Errorf("GetCloudaccounts error: %v", err)
return
}
for i := range accounts {
err = accounts[i].StartSyncCloudpolicyTask(ctx, userCred, "")
if err != nil {
log.Errorf("StartSyncCloudpolicyTask for account %s(%s) error: %v", accounts[i].Name, accounts[i].Provider, err)
}
}
}

func (self *SCloudaccount) StartSyncCloudpolicyTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
params := jsonutils.NewDict()
task, err := taskman.TaskManager.NewTask(ctx, "SyncCloudpoliciesTask", self, userCred, params, parentTaskId, "", nil)
if err != nil {
return errors.Wrap(err, "NewTask")
}
task.ScheduleRun(nil)
return nil
}

func (manager *SCloudaccountManager) SyncCloudgroups(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
waitForSync("SyncCloudgroupsTask")
accounts, err := manager.GetSupportCreateCloudgroupAccounts()
if err != nil {
log.Errorf("GetSupportCreateCloudgroupAccounts error: %v", err)
return
}
for i := range accounts {
err = accounts[i].StartSyncCloudgroupcacheTask(ctx, userCred, "")
err = accounts[i].StartSyncCloudIdResourcesTask(ctx, userCred, "")
if err != nil {
log.Errorf("StartSyncCloudgroupcacheTask for account %s(%s) error: %v", accounts[i].Name, accounts[i].Provider, err)
log.Errorf("StartSyncCloudIdResourcesTask for account %s(%s) error: %v", accounts[i].Name, accounts[i].Provider, err)
}
}
}

func (self *SCloudaccount) StartSyncCloudgroupcacheTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
func (self *SCloudaccount) StartSyncCloudIdResourcesTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
params := jsonutils.NewDict()
task, err := taskman.TaskManager.NewTask(ctx, "SyncCloudgroupcachesTask", self, userCred, params, parentTaskId, "", nil)
task, err := taskman.TaskManager.NewTask(ctx, "SyncCloudIdResourcesTask", self, userCred, params, parentTaskId, "", nil)
if err != nil {
return errors.Wrap(err, "NewTask")
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/cloudid/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ type SCloudIdOptions struct {
common_options.CommonOptions
common_options.DBOptions

CloudaccountSyncIntervalMinutes int `help:"frequency to sync region cloudaccount task" default:"3"`
CloudpolicySyncIntervalHours int `help:"frequency to sync region cloudpolicy task" default:"12"`
CloudgroupSyncIntervalHours int `help:"frequency to sync region cloudgrouptask" default:"3"`
ClouduserSyncIntervalHours int `help:"frequency to sync clouduser task" default:"7"`
CloudaccountSyncIntervalMinutes int `help:"frequency to sync region cloudaccount task" default:"3"`
CloudIdResourceSyncIntervalHours int `help:"frequency to sync region cloudpolicy task" default:"3"`
}

var (
Expand Down
4 changes: 1 addition & 3 deletions pkg/cloudid/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ func StartService() {
if !opts.IsSlaveNode {
cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount)
cron.AddJobAtIntervalsWithStartRun("SyncCloudaccounts", time.Duration(opts.CloudaccountSyncIntervalMinutes)*time.Minute, models.CloudaccountManager.SyncCloudaccounts, true)
cron.AddJobAtIntervalsWithStartRun("SyncCloudpolicies", time.Duration(opts.CloudpolicySyncIntervalHours)*time.Hour, models.CloudaccountManager.SyncCloudpolicies, true)
cron.AddJobAtIntervalsWithStartRun("SyncCloudgroups", time.Duration(opts.CloudgroupSyncIntervalHours)*time.Hour, models.CloudaccountManager.SyncCloudgroups, true)
cron.AddJobAtIntervalsWithStartRun("SyncCloudusersTask", time.Duration(opts.ClouduserSyncIntervalHours)*time.Hour, models.CloudaccountManager.SyncCloudusers, true)
cron.AddJobAtIntervalsWithStartRun("SyncCloudIdResources", time.Duration(opts.CloudIdResourceSyncIntervalHours)*time.Hour, models.CloudaccountManager.SyncCloudidResources, true)
cron.Start()
defer cron.Stop()
}
Expand Down
60 changes: 0 additions & 60 deletions pkg/cloudid/tasks/sync_cloudgroupcaches_task.go

This file was deleted.

33 changes: 25 additions & 8 deletions pkg/cloudid/tasks/sync_cloudpolicies_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ import (
"yunion.io/x/onecloud/pkg/cloudid/models"
)

type SyncCloudpoliciesTask struct {
type SyncCloudIdResourcesTask struct {
taskman.STask
}

func init() {
taskman.RegisterTask(SyncCloudpoliciesTask{})
taskman.RegisterTask(SyncCloudIdResourcesTask{})
}

func (self *SyncCloudpoliciesTask) taskFailed(ctx context.Context, cloudaccount *models.SCloudaccount, err error) {
func (self *SyncCloudIdResourcesTask) taskFailed(ctx context.Context, cloudaccount *models.SCloudaccount, err error) {
self.SetStageFailed(ctx, err.Error())
}

func (self *SyncCloudpoliciesTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) {
func (self *SyncCloudIdResourcesTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) {
account := obj.(*models.SCloudaccount)

provider, err := account.GetProvider()
Expand All @@ -49,12 +49,29 @@ func (self *SyncCloudpoliciesTask) OnInit(ctx context.Context, obj db.IStandalon

policy, err := provider.GetISystemCloudpolicies()
if err != nil {
self.taskFailed(ctx, account, errors.Wrapf(err, "GetISystemCloudpolicies for %s(%s)", account.Name, account.Provider))
return
log.Errorf("GetISystemCloudpolicies for %s(%s) failed: %v", account.Name, account.Provider, err)
} else {
result := account.SyncCloudpolicies(ctx, self.GetUserCred(), policy)
log.Infof("Sync policies for %s(%s) result: %s", account.Name, account.Provider, result.Result())
}

groups, err := provider.GetICloudgroups()
if err != nil {
log.Errorf("GetICloudgroups for %s(%s) failed: %v", account.Name, account.Provider, err)
} else {
result := account.SyncCloudgroupcaches(ctx, self.GetUserCred(), groups)
log.Infof("Sync groups for %s(%s) result: %s", account.Name, account.Provider, result.Result())
}

result := account.SyncCloudpolicies(ctx, self.GetUserCred(), policy)
log.Infof("Sync policies for %s(%s) result: %s", account.Name, account.Provider, result.Result())
self.SetStage("OnSyncCloudusersComplete", nil)
account.StartSyncCloudusersTask(ctx, self.GetUserCred(), self.GetParentId())
self.SetStageComplete(ctx, nil)
}

func (self *SyncCloudIdResourcesTask) OnSyncCloudusersComplete(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
self.SetStageComplete(ctx, nil)
}

func (self *SyncCloudIdResourcesTask) OnClouduserSyncCompleteFailed(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
self.SetStageFailed(ctx, data.String())
}

0 comments on commit 544c19f

Please sign in to comment.