Skip to content

Commit

Permalink
fix: manager_id资源隔离获取
Browse files Browse the repository at this point in the history
  • Loading branch information
Qu Xuan committed Jul 2, 2020
1 parent 029233a commit 66fc71c
Show file tree
Hide file tree
Showing 32 changed files with 323 additions and 123 deletions.
7 changes: 7 additions & 0 deletions pkg/cloudcommon/db/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ func SetExternalId(model IExternalizedModel, userCred mcclient.TokenCredential,
}

func FetchByExternalId(manager IModelManager, idStr string) (IExternalizedModel, error) {
return FetchByExternalIdAndManagerId(manager, idStr, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q
})
}

func FetchByExternalIdAndManagerId(manager IModelManager, idStr string, filter func(q *sqlchemy.SQuery) *sqlchemy.SQuery) (IExternalizedModel, error) {
q := manager.Query().Equals("external_id", idStr)
q = filter(q)
count, err := q.CountWithError()
if err != nil {
return nil, err
Expand Down
13 changes: 11 additions & 2 deletions pkg/compute/guestdrivers/managedvirtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/osprofile"
"yunion.io/x/pkg/utils"
"yunion.io/x/sqlchemy"

billing_api "yunion.io/x/onecloud/pkg/apis/billing"
api "yunion.io/x/onecloud/pkg/apis/compute"
Expand Down Expand Up @@ -449,7 +450,9 @@ func (self *SManagedVirtualizedGuestDriver) RemoteDeployGuestForCreate(ctx conte
}

if hostId := iVM.GetIHostId(); len(hostId) > 0 {
host, err := db.FetchByExternalId(models.HostManager, hostId)
host, err := db.FetchByExternalIdAndManagerId(models.HostManager, hostId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", host.ManagerId)
})
if err != nil {
log.Warningf("failed to found new hostId(%s) for ivm %s(%s) error: %v", hostId, guest.Name, guest.Id, err)
} else if host.GetId() != guest.HostId {
Expand Down Expand Up @@ -886,7 +889,13 @@ func (self *SManagedVirtualizedGuestDriver) OnGuestDeployTaskDataReceived(ctx co
}

if len(diskInfo[i].StorageExternalId) > 0 {
storage, err := db.FetchByExternalId(models.StorageManager, diskInfo[i].StorageExternalId)
storage, err := db.FetchByExternalIdAndManagerId(models.StorageManager, diskInfo[i].StorageExternalId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
host := guest.GetHost()
if host != nil {
return q.Equals("manager_id", host.ManagerId)
}
return q
})
if err != nil {
log.Warningf("failed to found storage by externalId %s error: %v", diskInfo[i].StorageExternalId, err)
} else if disk.StorageId != storage.GetId() {
Expand Down
15 changes: 13 additions & 2 deletions pkg/compute/guestdrivers/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/utils"
"yunion.io/x/sqlchemy"

api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
Expand Down Expand Up @@ -374,7 +375,12 @@ func (self *SOpenStackGuestDriver) RequestMigrate(ctx context.Context, guest *mo
if hostExternalId == "" {
return nil, errors.Wrap(fmt.Errorf("empty hostExternalId"), "iVM.GetIHostId()")
}
iHost, err := db.FetchByExternalId(models.HostManager, hostExternalId)
iHost, err := db.FetchByExternalIdAndManagerId(models.HostManager, hostExternalId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
if host := guest.GetHost(); host != nil {
return q.Equals("manager_id", host.ManagerId)
}
return q
})
if err != nil {
return nil, errors.Wrapf(err, "db.FetchByExternalId(models.HostManager,%s)", hostExternalId)
}
Expand Down Expand Up @@ -414,7 +420,12 @@ func (self *SOpenStackGuestDriver) RequestLiveMigrate(ctx context.Context, guest
if hostExternalId == "" {
return nil, errors.Wrap(fmt.Errorf("empty hostExternalId"), "iVM.GetIHostId()")
}
iHost, err := db.FetchByExternalId(models.HostManager, hostExternalId)
iHost, err := db.FetchByExternalIdAndManagerId(models.HostManager, hostExternalId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
if host := guest.GetHost(); host != nil {
return q.Equals("manager_id", host.ManagerId)
}
return q
})
if err != nil {
return nil, errors.Wrapf(err, "db.FetchByExternalId(models.HostManager,%s)", hostExternalId)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/cloudproviderregions.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (self *SCloudproviderregion) GetAccount() *SCloudaccount {
func (self *SCloudproviderregion) GetRegion() *SCloudregion {
regionObj, err := CloudregionManager.FetchById(self.CloudregionId)
if err != nil {
log.Errorf("CloudproviderManager.FetchById fail %s", err)
log.Errorf("CloudregionManager.FetchById(%s) fail %s", self.CloudregionId, err)
return nil
}
return regionObj.(*SCloudregion)
Expand Down
23 changes: 18 additions & 5 deletions pkg/compute/models/cloudregions.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,28 @@ func (self *SCloudregion) getGuestCountInternal(increment bool) (int, error) {
return query.CountWithError()
}

func (self *SCloudregion) GetVpcCount() (int, error) {
func (self *SCloudregion) GetVpcQuery() *sqlchemy.SQuery {
vpcs := VpcManager.Query()
if self.Id == api.DEFAULT_REGION_ID {
return vpcs.Filter(sqlchemy.OR(sqlchemy.IsNull(vpcs.Field("cloudregion_id")),
sqlchemy.IsEmpty(vpcs.Field("cloudregion_id")),
sqlchemy.Equals(vpcs.Field("cloudregion_id"), self.Id))).CountWithError()
} else {
return vpcs.Equals("cloudregion_id", self.Id).CountWithError()
sqlchemy.Equals(vpcs.Field("cloudregion_id"), self.Id)))
}
return vpcs.Equals("cloudregion_id", self.Id)
}

func (self *SCloudregion) GetVpcCount() (int, error) {
return self.GetVpcQuery().CountWithError()
}

func (self *SCloudregion) GetVpcs() ([]SVpc, error) {
vpcs := []SVpc{}
q := self.GetVpcQuery()
err := db.FetchModelObjects(VpcManager, q, &vpcs)
if err != nil {
return nil, errors.Wrap(err, "db.FetchModelObjects")
}
return vpcs, nil
}

func (self *SCloudregion) GetDriver() IRegionDriver {
Expand Down Expand Up @@ -497,7 +510,7 @@ func (self *SCloudregion) AllowPerformDefaultVpc(ctx context.Context, userCred m
}

func (self *SCloudregion) PerformDefaultVpc(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
vpcs, err := VpcManager.getVpcsByRegion(self, nil)
vpcs, err := self.GetVpcs()
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/compute/models/dbinstance_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ func (self *SDBInstanceBackup) SyncWithCloudDBInstanceBackup(

if dbinstanceId := extBackup.GetDBInstanceId(); len(dbinstanceId) > 0 {
//有可能云上删除了实例,未删除备份
_instance, err := db.FetchByExternalId(DBInstanceManager, dbinstanceId)
_instance, err := db.FetchByExternalIdAndManagerId(DBInstanceManager, dbinstanceId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err == sql.ErrNoRows {
self.DBInstanceId = ""
}
Expand Down Expand Up @@ -493,7 +495,9 @@ func (manager *SDBInstanceBackupManager) newFromCloudDBInstanceBackup(
backup.ExternalId = extBackup.GetGlobalId()

if dbinstanceId := extBackup.GetDBInstanceId(); len(dbinstanceId) > 0 {
_dbinstance, err := db.FetchByExternalId(DBInstanceManager, dbinstanceId)
_dbinstance, err := db.FetchByExternalIdAndManagerId(DBInstanceManager, dbinstanceId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err != nil {
log.Warningf("failed to found dbinstance for backup %s by externalId: %s error: %v", backup.Name, dbinstanceId, err)
} else {
Expand Down
21 changes: 17 additions & 4 deletions pkg/compute/models/dbinstancenetworks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/compare"
"yunion.io/x/pkg/util/netutils"
"yunion.io/x/sqlchemy"

api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
Expand Down Expand Up @@ -162,9 +163,15 @@ func (manager *SDBInstanceNetworkManager) SyncDBInstanceNetwork(ctx context.Cont

func (self *SDBInstanceNetwork) syncWithCloudDBNetwork(ctx context.Context, userCred mcclient.TokenCredential, dbinstance *SDBInstance, network *cloudprovider.SDBInstanceNetwork) error {
_, err := db.UpdateWithLock(ctx, self, func() error {
_localnetwork, err := db.FetchByExternalId(NetworkManager, network.NetworkId)
_localnetwork, err := db.FetchByExternalIdAndManagerId(NetworkManager, network.NetworkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
wire := WireManager.Query().SubQuery()
vpc := VpcManager.Query().SubQuery()
return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
Filter(sqlchemy.Equals(vpc.Field("manager_id"), dbinstance.ManagerId))
})
if err != nil {
return errors.Wrapf(err, "FetchByExternalId")
return errors.Wrapf(err, "FetchByExternalIdAndManagerId")
}
localnetwork := _localnetwork.(*SNetwork)
self.NetworkId = localnetwork.Id
Expand Down Expand Up @@ -194,9 +201,15 @@ func (manager *SDBInstanceNetworkManager) newFromCloudDBNetwork(ctx context.Cont
dbNetwork.SetModelManager(manager, &dbNetwork)

dbNetwork.DBInstanceId = dbinstance.Id
_localnetwork, err := db.FetchByExternalId(NetworkManager, network.NetworkId)
_localnetwork, err := db.FetchByExternalIdAndManagerId(NetworkManager, network.NetworkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
wire := WireManager.Query().SubQuery()
vpc := VpcManager.Query().SubQuery()
return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
Filter(sqlchemy.Equals(vpc.Field("manager_id"), dbinstance.ManagerId))
})
if err != nil {
return errors.Wrapf(err, "newFromCloudDBNetwork.FetchByExternalId")
return errors.Wrapf(err, "newFromCloudDBNetwork.FetchByExternalIdAndManagerId")
}

localnetwork := _localnetwork.(*SNetwork)
Expand Down
16 changes: 12 additions & 4 deletions pkg/compute/models/dbinstances.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,12 +1248,16 @@ func (manager *SDBInstanceManager) SyncDBInstanceMasterId(ctx context.Context, u
for _, instance := range cloudDBInstances {
masterId := instance.GetMasterInstanceId()
if len(masterId) > 0 {
master, err := db.FetchByExternalId(manager, masterId)
master, err := db.FetchByExternalIdAndManagerId(manager, masterId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err != nil {
log.Errorf("failed to found master dbinstance by externalId: %s error: %v", masterId, err)
continue
}
slave, err := db.FetchByExternalId(manager, instance.GetGlobalId())
slave, err := db.FetchByExternalIdAndManagerId(manager, instance.GetGlobalId(), func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err != nil {
log.Errorf("failed to found local dbinstance by externalId %s error: %v", instance.GetGlobalId(), err)
continue
Expand Down Expand Up @@ -1507,7 +1511,9 @@ func (self *SDBInstance) SyncWithCloudDBInstance(ctx context.Context, userCred m

if len(self.VpcId) == 0 {
if vpcId := extInstance.GetIVpcId(); len(vpcId) > 0 {
vpc, err := db.FetchByExternalId(VpcManager, vpcId)
vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err != nil {
return errors.Wrapf(err, "SyncWithCloudDBInstance.FetchVpcId")
}
Expand Down Expand Up @@ -1592,7 +1598,9 @@ func (manager *SDBInstanceManager) newFromCloudDBInstance(ctx context.Context, u
}

if vpcId := extInstance.GetIVpcId(); len(vpcId) > 0 {
vpc, err := db.FetchByExternalId(VpcManager, vpcId)
vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err != nil {
return nil, errors.Wrapf(err, "newFromCloudDBInstance.FetchVpcId")
}
Expand Down
37 changes: 26 additions & 11 deletions pkg/compute/models/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,21 +1217,26 @@ func (manager *SDiskManager) getDisksByStorage(storage *SStorage) ([]SDisk, erro
return disks, nil
}

func (manager *SDiskManager) syncCloudDisk(ctx context.Context, userCred mcclient.TokenCredential, provider cloudprovider.ICloudProvider, vdisk cloudprovider.ICloudDisk, index int, syncOwnerId mcclient.IIdentityProvider) (*SDisk, error) {
func (manager *SDiskManager) syncCloudDisk(ctx context.Context, userCred mcclient.TokenCredential, provider cloudprovider.ICloudProvider, vdisk cloudprovider.ICloudDisk, index int, syncOwnerId mcclient.IIdentityProvider, managerId string) (*SDisk, error) {
// ownerProjId := projectId

lockman.LockClass(ctx, manager, db.GetLockClassKey(manager, syncOwnerId))
defer lockman.ReleaseClass(ctx, manager, db.GetLockClassKey(manager, syncOwnerId))

diskObj, err := db.FetchByExternalId(manager, vdisk.GetGlobalId())
diskObj, err := db.FetchByExternalIdAndManagerId(manager, vdisk.GetGlobalId(), func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
sq := StorageManager.Query().SubQuery()
return q.Join(sq, sqlchemy.Equals(sq.Field("id"), q.Field("storage_id"))).Filter(sqlchemy.Equals(sq.Field("manager_id"), managerId))
})
if err != nil {
if err == sql.ErrNoRows {
vstorage, err := vdisk.GetIStorage()
if err != nil {
return nil, errors.Wrapf(err, "unable to GetIStorage of vdisk %q", vdisk.GetName())
}

storageObj, err := db.FetchByExternalId(StorageManager, vstorage.GetGlobalId())
storageObj, err := db.FetchByExternalIdAndManagerId(StorageManager, vstorage.GetGlobalId(), func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", managerId)
})
if err != nil {
log.Errorf("cannot find storage of vdisk %s", err)
return nil, err
Expand All @@ -1243,7 +1248,7 @@ func (manager *SDiskManager) syncCloudDisk(ctx context.Context, userCred mcclien
}
} else {
disk := diskObj.(*SDisk)
err = disk.syncWithCloudDisk(ctx, userCred, provider, vdisk, index, syncOwnerId)
err = disk.syncWithCloudDisk(ctx, userCred, provider, vdisk, index, syncOwnerId, managerId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1288,7 +1293,7 @@ func (manager *SDiskManager) SyncDisks(ctx context.Context, userCred mcclient.To
}

for i := 0; i < len(commondb); i += 1 {
err = commondb[i].syncWithCloudDisk(ctx, userCred, provider, commonext[i], -1, syncOwnerId)
err = commondb[i].syncWithCloudDisk(ctx, userCred, provider, commonext[i], -1, syncOwnerId, storage.ManagerId)
if err != nil {
syncResult.UpdateError(err)
} else {
Expand All @@ -1301,7 +1306,10 @@ func (manager *SDiskManager) SyncDisks(ctx context.Context, userCred mcclient.To

for i := 0; i < len(added); i += 1 {
extId := added[i].GetGlobalId()
_disk, err := db.FetchByExternalId(manager, extId)
_disk, err := db.FetchByExternalIdAndManagerId(manager, extId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
sq := StorageManager.Query().SubQuery()
return q.Join(sq, sqlchemy.Equals(sq.Field("id"), q.Field("storage_id"))).Filter(sqlchemy.Equals(sq.Field("manager_id"), storage.ManagerId))
})
if err != nil && err != sql.ErrNoRows {
//主要是显示duplicate err及 general err,方便排错
msg := fmt.Errorf("failed to found disk by external Id %s error: %v", extId, err)
Expand All @@ -1310,7 +1318,7 @@ func (manager *SDiskManager) SyncDisks(ctx context.Context, userCred mcclient.To
}
if _disk != nil {
disk := _disk.(*SDisk)
err = disk.syncDiskStorage(ctx, userCred, added[i])
err = disk.syncDiskStorage(ctx, userCred, added[i], storage.ManagerId)
if err != nil {
syncResult.UpdateError(err)
} else {
Expand All @@ -1332,15 +1340,17 @@ func (manager *SDiskManager) SyncDisks(ctx context.Context, userCred mcclient.To
return localDisks, remoteDisks, syncResult
}

func (self *SDisk) syncDiskStorage(ctx context.Context, userCred mcclient.TokenCredential, idisk cloudprovider.ICloudDisk) error {
func (self *SDisk) syncDiskStorage(ctx context.Context, userCred mcclient.TokenCredential, idisk cloudprovider.ICloudDisk, managerId string) error {
extId := idisk.GetGlobalId()
istorage, err := idisk.GetIStorage()
if err != nil {
log.Errorf("failed to get istorage for disk %s error: %v", extId, err)
return err
}
storageExtId := istorage.GetGlobalId()
storage, err := db.FetchByExternalId(StorageManager, storageExtId)
storage, err := db.FetchByExternalIdAndManagerId(StorageManager, storageExtId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", managerId)
})
if err != nil {
log.Errorf("failed to found storage by istorage %s error: %v", storageExtId, err)
return err
Expand Down Expand Up @@ -1392,7 +1402,12 @@ func (self *SDisk) syncRemoveCloudDisk(ctx context.Context, userCred mcclient.To
iDisk, err := iregion.GetIDiskById(self.ExternalId)
if err == nil {
if storageId := iDisk.GetIStorageId(); len(storageId) > 0 {
storage, err := db.FetchByExternalId(StorageManager, storageId)
storage, err := db.FetchByExternalIdAndManagerId(StorageManager, storageId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
if s := self.GetStorage(); s != nil {
return q.Equals("manager_id", s.ManagerId)
}
return q
})
if err == nil {
_, err = db.Update(self, func() error {
self.StorageId = storage.GetId()
Expand All @@ -1418,7 +1433,7 @@ func (self *SDisk) syncRemoveCloudDisk(ctx context.Context, userCred mcclient.To
return self.RealDelete(ctx, userCred)
}

func (self *SDisk) syncWithCloudDisk(ctx context.Context, userCred mcclient.TokenCredential, provider cloudprovider.ICloudProvider, extDisk cloudprovider.ICloudDisk, index int, syncOwnerId mcclient.IIdentityProvider) error {
func (self *SDisk) syncWithCloudDisk(ctx context.Context, userCred mcclient.TokenCredential, provider cloudprovider.ICloudProvider, extDisk cloudprovider.ICloudDisk, index int, syncOwnerId mcclient.IIdentityProvider, managerId string) error {
recycle := false
guests := self.GetGuests()
if provider.GetFactory().IsSupportPrepaidResources() && len(guests) == 1 && guests[0].IsPrepaidRecycle() {
Expand Down
12 changes: 10 additions & 2 deletions pkg/compute/models/elasticcache_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,15 +586,23 @@ func (manager *SElasticcacheManager) newFromCloudElasticcache(ctx context.Contex
}

if vpcId := extInstance.GetVpcId(); len(vpcId) > 0 {
vpc, err := db.FetchByExternalId(VpcManager, vpcId)
vpc, err := db.FetchByExternalIdAndManagerId(VpcManager, vpcId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.Equals("manager_id", provider.Id)
})
if err != nil {
return nil, errors.Wrapf(err, "newFromCloudElasticcache.FetchVpcId")
}
instance.VpcId = vpc.GetId()
}

if networkId := extInstance.GetNetworkId(); len(networkId) > 0 {
network, err := db.FetchByExternalId(NetworkManager, networkId)
network, err := db.FetchByExternalIdAndManagerId(NetworkManager, networkId, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
wire := WireManager.Query().SubQuery()
vpc := VpcManager.Query().SubQuery()
return q.Join(wire, sqlchemy.Equals(wire.Field("id"), q.Field("wire_id"))).
Join(vpc, sqlchemy.Equals(vpc.Field("id"), wire.Field("vpc_id"))).
Filter(sqlchemy.Equals(vpc.Field("manager_id"), provider.Id))
})
if err != nil {
return nil, errors.Wrapf(err, "newFromCloudElasticcache.FetchNetworkId")
}
Expand Down

0 comments on commit 66fc71c

Please sign in to comment.