Skip to content

Commit

Permalink
refactor ITableSpec signature with context
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi committed Jun 8, 2020
1 parent 95a58cc commit 9f4d4a4
Show file tree
Hide file tree
Showing 140 changed files with 290 additions and 273 deletions.
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/db_dispatcher.go
Expand Up @@ -1167,7 +1167,7 @@ func _doCreateItem(
if err != nil {
return nil, httperrors.NewGeneralError(err)
}
err = manager.TableSpec().InsertOrUpdate(model)
err = manager.TableSpec().InsertOrUpdate(ctx, model)
if err != nil {
return nil, httperrors.NewGeneralError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/metadata.go
Expand Up @@ -514,7 +514,7 @@ func (manager *SMetadataManager) SetValues(ctx context.Context, obj IModel, stor
}

if len(record.Id) == 0 {
err = manager.TableSpec().InsertOrUpdate(&newRecord)
err = manager.TableSpec().InsertOrUpdate(ctx, &newRecord)
} else {
rV, rD := record.Value, record.Deleted
_, err = Update(&record, func() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/opslog.go
Expand Up @@ -399,7 +399,7 @@ func (manager *SOpsLogManager) LogEvent(model IModel, action string, notes inter
}
}

err := manager.TableSpec().Insert(&opslog)
err := manager.TableSpec().Insert(context.Background(), &opslog)
if err != nil {
log.Errorf("fail to insert opslog: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/proxy/proxysetting.go
Expand Up @@ -228,7 +228,7 @@ func (man *SProxySettingManager) InitializeData() error {
ps.Description = "Connect directly"
ps.IsPublic = true
ps.PublicScope = string(rbacutils.ScopeSystem)
if err := man.TableSpec().Insert(ps); err != nil {
if err := man.TableSpec().Insert(context.Background(), ps); err != nil {
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudcommon/db/quotas/models.go
Expand Up @@ -155,7 +155,7 @@ func (manager *SQuotaBaseManager) getQuotasInternal(ctx context.Context, keys IQ
}

func (manager *SQuotaBaseManager) setQuotaInternal(ctx context.Context, userCred mcclient.TokenCredential, quota IQuota) error {
err := manager.TableSpec().InsertOrUpdate(quota)
err := manager.TableSpec().InsertOrUpdate(ctx, quota)
if err != nil {
return errors.Wrap(err, "InsertOrUpdate")
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (manager *SQuotaBaseManager) InitializeData() error {
if quota.IsEmpty() {
quota.FetchSystemQuota()
}
err = manager.TableSpec().Insert(quota)
err = manager.TableSpec().Insert(context.Background(), quota)
if err != nil {
log.Errorf("%s insert error %s", manager.KeywordPlural(), err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/sharedresource.go
Expand Up @@ -205,7 +205,7 @@ func (manager *SSharedResourceManager) shareToTarget(
sharedResource.ResourceId = model.GetId()
sharedResource.TargetProjectId = targetId
sharedResource.TargetType = targetType
if insetErr := SharedResourceManager.TableSpec().Insert(sharedResource); insetErr != nil {
if insetErr := SharedResourceManager.TableSpec().Insert(ctx, sharedResource); insetErr != nil {
return nil, httperrors.NewInternalServerError("Insert shared resource failed %s", insetErr)
}
}
Expand Down
40 changes: 24 additions & 16 deletions pkg/cloudcommon/db/tablespec.go
Expand Up @@ -30,9 +30,9 @@ import (
type ITableSpec interface {
Name() string
DataType() reflect.Type
Insert(dt interface{}) error
InsertOrUpdate(dt interface{}) error
Update(dt interface{}, doUpdate func() error) (sqlchemy.UpdateDiffs, error)
Insert(ctx context.Context, dt interface{}) error
InsertOrUpdate(ctx context.Context, dt interface{}) error
Update(ctx context.Context, dt interface{}, doUpdate func() error) (sqlchemy.UpdateDiffs, error)
Instance() *sqlchemy.STable
ColumnSpec(name string) sqlchemy.IColumnSpec
PrimaryColumns() []sqlchemy.IColumnSpec
Expand Down Expand Up @@ -87,23 +87,23 @@ func (ts *sTableSpec) isMarkDeleted(dt interface{}) (bool, error) {
return obj.GetDeleted(), nil
}

func (ts *sTableSpec) Insert(dt interface{}) error {
func (ts *sTableSpec) Insert(ctx context.Context, dt interface{}) error {
if err := ts.STableSpec.Insert(dt); err != nil {
return err
}
ts.inform(dt, informer.Create)
ts.inform(ctx, dt, informer.Create)
return nil
}

func (ts *sTableSpec) InsertOrUpdate(dt interface{}) error {
func (ts *sTableSpec) InsertOrUpdate(ctx context.Context, dt interface{}) error {
if err := ts.STableSpec.InsertOrUpdate(dt); err != nil {
return err
}
ts.inform(dt, informer.Create)
ts.inform(ctx, dt, informer.Create)
return nil
}

func (ts *sTableSpec) Update(dt interface{}, doUpdate func() error) (sqlchemy.UpdateDiffs, error) {
func (ts *sTableSpec) Update(ctx context.Context, dt interface{}, doUpdate func() error) (sqlchemy.UpdateDiffs, error) {
oldObj := jsonutils.Marshal(dt)
diffs, err := ts.STableSpec.Update(dt, doUpdate)
if err != nil {
Expand All @@ -118,36 +118,44 @@ func (ts *sTableSpec) Update(dt interface{}, doUpdate func() error) (sqlchemy.Up
return nil, errors.Wrap(err, "check is mark deleted")
}
if isDeleted {
ts.inform(dt, informer.Delete)
ts.inform(ctx, dt, informer.Delete)
} else {
ts.informUpdate(dt, oldObj.(*jsonutils.JSONDict))
ts.informUpdate(ctx, dt, oldObj.(*jsonutils.JSONDict))
}
return diffs, nil
}

func (ts *sTableSpec) inform(dt interface{}, f func(ctx context.Context, obj *informer.ModelObject) error) {
func (ts *sTableSpec) inform(ctx context.Context, dt interface{}, f func(ctx context.Context, obj *informer.ModelObject) error) {
nf := func() {
obj, err := ts.newInformerModel(dt)
if err != nil {
log.Warningf("newInformerModel error: %v", err)
return
}
if err := f(context.Background(), obj); err != nil {
log.Errorf("call informer func error: %v", err)
if err := f(ctx, obj); err != nil {
if errors.Cause(err) == informer.ErrBackendNotInit {
log.V(4).Warningf("informer backend not init")
} else {
log.Errorf("call informer func error: %v", err)
}
}
}
nopanic.Run(nf)
}

func (ts *sTableSpec) informUpdate(dt interface{}, oldObj *jsonutils.JSONDict) {
func (ts *sTableSpec) informUpdate(ctx context.Context, dt interface{}, oldObj *jsonutils.JSONDict) {
nf := func() {
obj, err := ts.newInformerModel(dt)
if err != nil {
log.Warningf("newInformerModel error: %v", err)
return
}
if err := informer.Update(context.Background(), obj, oldObj); err != nil {
log.Errorf("call informer update func error: %v", err)
if err := informer.Update(ctx, obj, oldObj); err != nil {
if errors.Cause(err) == informer.ErrBackendNotInit {
log.V(4).Warningf("informer backend not init")
} else {
log.Errorf("call informer update func error: %v", err)
}
}
}
nopanic.Run(nf)
Expand Down
10 changes: 5 additions & 5 deletions pkg/cloudcommon/db/taskman/tasks.go
Expand Up @@ -263,15 +263,15 @@ func (manager *STaskManager) NewTask(
Params: data,
Stage: TASK_INIT_STAGE,
}
err := manager.TableSpec().Insert(&task)
err := manager.TableSpec().Insert(ctx, &task)
if err != nil {
log.Errorf("Task insert error %s", err)
return nil, err
}
parentTask := task.GetParentTask()
if parentTask != nil {
st := SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
err := SubTaskManager.TableSpec().Insert(&st)
err := SubTaskManager.TableSpec().Insert(ctx, &st)
if err != nil {
log.Errorf("Subtask insert error %s", err)
return nil, err
Expand Down Expand Up @@ -311,14 +311,14 @@ func (manager *STaskManager) NewParallelTask(
Params: data,
Stage: TASK_INIT_STAGE,
}
err := manager.TableSpec().Insert(&task)
err := manager.TableSpec().Insert(ctx, &task)
if err != nil {
log.Errorf("Task insert error %s", err)
return nil, err
}
for _, obj := range objs {
to := STaskObject{TaskId: task.Id, ObjId: obj.GetId()}
err := TaskObjectManager.TableSpec().Insert(&to)
err := TaskObjectManager.TableSpec().Insert(ctx, &to)
if err != nil {
log.Errorf("Taskobject insert error %s", err)
return nil, err
Expand All @@ -327,7 +327,7 @@ func (manager *STaskManager) NewParallelTask(
parentTask := task.GetParentTask()
if parentTask != nil {
st := SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
err := SubTaskManager.TableSpec().Insert(&st)
err := SubTaskManager.TableSpec().Insert(ctx, &st)
if err != nil {
log.Errorf("Subtask insert error %s", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/tenantcache.go
Expand Up @@ -336,7 +336,7 @@ func (manager *STenantCacheManager) Save(ctx context.Context, idStr string, name
obj.Domain = domain
obj.DomainId = domainId
obj.LastCheck = now
err = manager.TableSpec().InsertOrUpdate(obj)
err = manager.TableSpec().InsertOrUpdate(ctx, obj)
if err != nil {
return nil, errors.Wrap(err, "InsertOrUpdate")
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/update.go
Expand Up @@ -23,7 +23,7 @@ import (
)

func Update(model IModel, updateFunc func() error) (sqlchemy.UpdateDiffs, error) {
return model.GetModelManager().TableSpec().Update(model, updateFunc)
return model.GetModelManager().TableSpec().Update(context.Background(), model, updateFunc)
}

func UpdateWithLock(ctx context.Context, model IModel, updateFunc func() error) (sqlchemy.UpdateDiffs, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/db/usercache.go
Expand Up @@ -172,7 +172,7 @@ func (manager *SUserCacheManager) Save(ctx context.Context, idStr string, name s
obj.Domain = domain
obj.DomainId = domainId
obj.LastCheck = time.Now().UTC()
err = manager.TableSpec().InsertOrUpdate(obj)
err = manager.TableSpec().InsertOrUpdate(ctx, obj)
if err != nil {
return nil, err
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/informer/informer.go
Expand Up @@ -55,7 +55,7 @@ func Set(be IInformerBackend) {

func GetDefaultBackend() IInformerBackend {
if defaultBackend == nil {
log.Warningf("default informer backend is not init")
log.V(10).Warningf("default informer backend is not init")
}
return defaultBackend
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudevent/models/cloudevents.go
Expand Up @@ -148,7 +148,7 @@ func (manager *SCloudeventManager) SyncCloudevent(ctx context.Context, userCred

event.CreatedAt = iEvent.GetCreatedAt()
event.SetModelManager(manager, event)
err := manager.TableSpec().Insert(event)
err := manager.TableSpec().Insert(ctx, event)
if err != nil {
log.Errorf("failed to insert event: %s for cloudprovider: %s(%s) error: %v", jsonutils.Marshal(event).PrettyString(), cloudprovider.Name, cloudprovider.Id, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudevent/models/cloudproviders.go
Expand Up @@ -206,7 +206,7 @@ func (self *SCloudprovider) SetLastSyncTimeAt(userCred mcclient.TokenCredential,

func (manager *SCloudproviderManager) newFromRegionProvider(ctx context.Context, userCred mcclient.TokenCredential, cloudprovider SCloudprovider) error {
cloudprovider.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_IDLE
return manager.TableSpec().Insert(&cloudprovider)
return manager.TableSpec().Insert(ctx, &cloudprovider)
}

func (manager *SCloudproviderManager) syncCloudeventTask(ctx context.Context, userCred mcclient.TokenCredential) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cloudnet/models/ifaces.go
Expand Up @@ -170,7 +170,7 @@ func (iface *SIface) addOrUpdatePeer(ctx context.Context, userCred mcclient.Toke
PersistentKeepalive: persistentKeepalive,
}
ifacePeer.Name = fmt.Sprintf("%s-%s", iface.Name, peerIface.Name)
err := IfacePeerManager.TableSpec().Insert(ifacePeer)
err := IfacePeerManager.TableSpec().Insert(ctx, ifacePeer)
return err
}
_, err = db.Update(ifacePeer, func() error {
Expand Down Expand Up @@ -396,7 +396,7 @@ func (man *SIfaceManager) addWireguardIface(ctx context.Context, userCred mcclie
}

iface.SetModelManager(man, iface)
err := man.TableSpec().Insert(iface)
err := man.TableSpec().Insert(ctx, iface)
if err != nil {
return nil, err
}
Expand All @@ -419,7 +419,7 @@ func (man *SIfaceManager) addIface(ctx context.Context, userCred mcclient.TokenC
Ifname: ifname,
}
iface.SetModelManager(man, iface)
err := man.TableSpec().Insert(iface)
err := man.TableSpec().Insert(ctx, iface)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudnet/models/meshnetwork_members.go
Expand Up @@ -147,6 +147,6 @@ func (man *SMeshNetworkMemberManager) addMember(ctx context.Context, userCred mc
}
member.SetModelManager(man, member)
member.Name = fmt.Sprintf("%s-%s", mn.Name, router.Name)
man.TableSpec().Insert(member)
man.TableSpec().Insert(ctx, member)
return member, nil
}
2 changes: 1 addition & 1 deletion pkg/cloudnet/models/rules.go
Expand Up @@ -380,7 +380,7 @@ func (man *SRuleManager) addRules(ctx context.Context, userCred mcclient.TokenCr
}

func (man *SRuleManager) addRule(ctx context.Context, userCred mcclient.TokenCredential, rule *SRule) error {
return man.TableSpec().Insert(rule)
return man.TableSpec().Insert(ctx, rule)
}

func (rule *SRule) firewalldRule() (*firewalld.Rule, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/buckets.go
Expand Up @@ -213,7 +213,7 @@ func (manager *SBucketManager) newFromCloudBucket(

bucket.IsEmulated = false

err = manager.TableSpec().Insert(&bucket)
err = manager.TableSpec().Insert(ctx, &bucket)
if err != nil {
return nil, errors.Wrap(err, "Insert")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/models/cachedimages.go
Expand Up @@ -220,7 +220,7 @@ func (manager *SCachedimageManager) cacheGlanceImageInfo(ctx context.Context, us
imageCache.Info = info
imageCache.LastSync = timeutils.UtcNow()

err = manager.TableSpec().Insert(&imageCache)
err = manager.TableSpec().Insert(ctx, &imageCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -490,7 +490,7 @@ func (manager *SCachedimageManager) newFromCloudImage(ctx context.Context, userC
cachedImage.ImageType = image.GetImageType()
cachedImage.ExternalId = image.GetGlobalId()

err = manager.TableSpec().Insert(&cachedImage)
err = manager.TableSpec().Insert(ctx, &cachedImage)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/models/cloudaccounts.go
Expand Up @@ -906,7 +906,7 @@ func (self *SCloudaccount) importSubAccount(ctx context.Context, userCred mcclie

newCloudprovider.SetModelManager(CloudproviderManager, &newCloudprovider)

err = CloudproviderManager.TableSpec().Insert(&newCloudprovider)
err = CloudproviderManager.TableSpec().Insert(ctx, &newCloudprovider)
if err != nil {
return nil, err
} else {
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func migrateCloudprovider(cloudprovider *SCloudprovider) error {
account.Name = providerName
account.Status = cloudprovider.Status

err := CloudaccountManager.TableSpec().Insert(&account)
err := CloudaccountManager.TableSpec().Insert(context.Background(), &account)
if err != nil {
log.Errorf("Insert Account error: %v", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/cloudprovider_quotas.go
Expand Up @@ -283,7 +283,7 @@ func (manager *SCloudproviderQuotaManager) newFromCloudQuota(ctx context.Context
quota.CloudregionId = region.Id
}

return manager.TableSpec().Insert(&quota)
return manager.TableSpec().Insert(ctx, &quota)
}

func (manager *SCloudproviderQuotaManager) ListItemExportKeys(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/cloudprovidercapacities.go
Expand Up @@ -73,7 +73,7 @@ func (manager *SCloudproviderCapabilityManager) setRegionCapabilities(ctx contex

for _, capability := range added {
cpc.Capability = capability
err := manager.TableSpec().InsertOrUpdate(&cpc)
err := manager.TableSpec().InsertOrUpdate(ctx, &cpc)
if err != nil {
return errors.Wrap(err, "manager.TableSpec().InsertOrUpdate")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/cloudproviderregions.go
Expand Up @@ -253,7 +253,7 @@ func (manager *SCloudproviderregionManager) FetchByIdsOrCreate(providerId string
cpr.Enabled = true
cpr.SyncStatus = api.CLOUD_PROVIDER_SYNC_STATUS_IDLE

err := manager.TableSpec().Insert(cpr)
err := manager.TableSpec().Insert(context.Background(), cpr)
if err != nil {
log.Errorf("insert fail %s", err)
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/cloudproviders.go
Expand Up @@ -1053,7 +1053,7 @@ func (manager *SCloudproviderManager) migrateVCenterInfo(vc *SVCenter) error {
cp.LastSync = vc.LastSync
cp.Provider = api.CLOUD_PROVIDER_VMWARE

return manager.TableSpec().Insert(&cp)
return manager.TableSpec().Insert(context.TODO(), &cp)
}

// 云订阅列表
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/models/cloudregions.go
Expand Up @@ -483,7 +483,7 @@ func (manager *SCloudregionManager) newFromCloudRegion(ctx context.Context, user
region.ManagerId = provider.Id
}

err = manager.TableSpec().Insert(&region)
err = manager.TableSpec().Insert(ctx, &region)
if err != nil {
log.Errorf("newFromCloudRegion fail %s", err)
return nil, err
Expand Down Expand Up @@ -549,7 +549,7 @@ func (manager *SCloudregionManager) InitializeData() error {
defRegion.Description = "Default Region"
defRegion.Status = api.CLOUD_REGION_STATUS_INSERVER
defRegion.Provider = api.CLOUD_PROVIDER_ONECLOUD
err := manager.TableSpec().Insert(&defRegion)
err := manager.TableSpec().Insert(context.TODO(), &defRegion)
if err != nil {
return errors.Wrap(err, "insert default region")
}
Expand Down

0 comments on commit 9f4d4a4

Please sign in to comment.