Skip to content

Commit

Permalink
fix/autoscaling: fix some problem
Browse files Browse the repository at this point in the history
1. ScalingGroupGuestManger.Query 默认添加 NotEqual(...)
2. ScalingGroup.Scale 扩大锁的粒度,更新冷却时间也挪到这里面
3. climc 中增加 scaling-group-show
  • Loading branch information
rainzm committed Apr 26, 2020
1 parent 74eef22 commit 4d8e0ba
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 21 deletions.
14 changes: 14 additions & 0 deletions cmd/climc/shell/scaling_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ func init() {
printList(result, modules.ScalingGroup.GetColumns(s))
return nil
})

type ScalingGroupShowOptions struct {
ID string
}
R(&ScalingGroupShowOptions{}, "scaling-group-show", "Show scaling group", func(s *mcclient.ClientSession,
args *ScalingGroupShowOptions) error {
result, err := modules.ScalingGroup.Get(s, args.ID, nil)
if err != nil {
return err
}
printObject(result)
return nil
})

type ScalingGroupCreateOptions struct {
NAME string

Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/models/guests.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (manager *SGuestManager) ListItemFilter(
q = q.In("id", diskQ.SubQuery())
}

scalingGroupQ := ScalingGroupGuestManager.Query("guest_id").NotEquals("guest_status", api.SG_GUEST_STATUS_PENDING_REMOVE).Snapshot()
scalingGroupQ := ScalingGroupGuestManager.Query("guest_id").Snapshot()
scalingGroupQ, err = manager.SScalingGroupResourceBaseManager.ListItemFilter(ctx, scalingGroupQ, userCred, query.ScalingGroupFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SScaligGroupResourceBaseManager.ListItemFilter")
Expand Down
25 changes: 15 additions & 10 deletions pkg/compute/models/scaling_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (sgm *SScalingGroupManager) FetchCustomizeColumns(

func (sg *SScalingGroup) GuestNumber() (int, error) {
q := GuestManager.Query().In("id", ScalingGroupGuestManager.Query("guest_id").Equals("scaling_group_id",
sg.Id).NotEquals("guest_status", api.SG_GUEST_STATUS_PENDING_REMOVE).SubQuery()).IsFalse("pending_deleted")
sg.Id).SubQuery()).IsFalse("pending_deleted")
return q.CountWithError()
}

Expand Down Expand Up @@ -440,8 +440,6 @@ type sExecResult struct {
func (sg *SScalingGroup) exec(ctx context.Context, action IScalingAction) (ret sExecResult) {
ret.code = 3
ret.intanceNum = -1
lockman.LockObject(ctx, sg)
defer lockman.ReleaseObject(ctx, sg)
// query again to fetch the latest desire instance number of sg
model, err := ScalingGroupManager.FetchById(sg.Id)
if err != nil {
Expand Down Expand Up @@ -497,20 +495,27 @@ func (sg *SScalingGroup) exec(ctx context.Context, action IScalingAction) (ret s

// Scale will modify SScalingGroup.DesireInstanceNumber and generate SScalingActivity based on the trigger and its
// corresponding SScalingPolicy.
func (sg *SScalingGroup) Scale(ctx context.Context, triggerDesc IScalingTriggerDesc, action IScalingAction) (bool, error) {
func (sg *SScalingGroup) Scale(ctx context.Context, triggerDesc IScalingTriggerDesc, action IScalingAction,
coolingTime int) error {
lockman.LockObject(ctx, sg)
defer lockman.ReleaseObject(ctx, sg)
isExec := false
defer func() {
if isExec && coolingTime > 0 {
sg.SetAllowScaleTime(time.Now().Add(time.Duration(coolingTime) * time.Second))
}
}()
if sg.Enabled.IsFalse() {
return isExec, nil
return nil
}
scalingActivity, err := ScalingActivityManager.CreateScalingActivity(sg.Id, triggerDesc.TriggerDescription(), api.SA_STATUS_EXEC)
if err != nil {
return isExec, errors.Wrapf(err, "create ScalingActivity whose ScalingGroup is %s error", sg.Id)
return errors.Wrapf(err, "create ScalingActivity whose ScalingGroup is %s error", sg.Id)
}
if action.CheckCoolTime() && !sg.AllowScale() {
err = scalingActivity.SetReject("",
fmt.Sprintf("The Cooling Time limit the execution time of the policy to at least: %s",
sg.AllowScaleTime))
return isExec, nil
fmt.Sprintf("The Cooling Time limit the execution time of the policy to at least: %s", sg.AllowScaleTime.Format("2006-01-02 15:04:05")))
return nil
}

ret := sg.exec(ctx, action)
Expand All @@ -530,7 +535,7 @@ func (sg *SScalingGroup) Scale(ctx context.Context, triggerDesc IScalingTriggerD
if err != nil {
log.Errorf("ScalingActivity set result failed: %s", err.Error())
}
return isExec, nil
return nil
}

func (sgm *SScalingGroupManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
Expand Down
5 changes: 1 addition & 4 deletions pkg/compute/models/scaling_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,10 @@ func (sp *SScalingPolicy) PerformTrigger(ctx context.Context, userCred mcclient.
}
triggerDesc = trigger
}
isExec, err := sg.Scale(ctx, triggerDesc, sp)
err = sg.Scale(ctx, triggerDesc, sp, sp.CoolingTime)
if err != nil {
return nil, errors.Wrap(err, "ScalingPolicy.Scale")
}
if isExec && sp.CoolingTime > 0 {
sg.SetAllowScaleTime(time.Now().Add(time.Duration(sp.CoolingTime) * time.Second))
}
return nil, err
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/compute/models/scalinggroup_guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"yunion.io/x/pkg/tristate"
"yunion.io/x/sqlchemy"

"yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (sgg *SScalingGroupGuest) Detach(ctx context.Context, userCred mcclient.Tok
func (sggm *SScalingGroupGuestManager) Fetch(scalingGroupId, guestId string) ([]SScalingGroupGuest, error) {

sggs := make([]SScalingGroupGuest, 0)
q := sggm.Query().NotEquals("guest_status", compute.SG_GUEST_STATUS_PENDING_REMOVE)
q := sggm.Query()
if len(scalingGroupId) != 0 {
q = q.Equals("scaling_group_id", scalingGroupId)
}
Expand All @@ -100,3 +101,8 @@ func (sgg *SScalingGroupGuest) SetGuestStatus(status string) error {
})
return err
}

func (sggm *SScalingGroupGuestManager) Query(fields ...string) *sqlchemy.SQuery {
return sggm.SVirtualJointResourceBaseManager.Query(fields...).NotEquals("guest_status",
compute.SG_GUEST_STATUS_PENDING_REMOVE)
}
2 changes: 1 addition & 1 deletion pkg/compute/tasks/guest_detach_scalinggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (self *GuestDetachScalingGroupTask) OnDeleteGuestComplete(ctx context.Conte
logclient.AddActionLogWithStartable(self, sg, logclient.ACT_REMOVE_GUEST, fmt.Sprintf("Instance '%s' was removed", guestId), self.UserCred, true)
if auto, _ := self.Params.Bool("auto"); !auto {
// scale; change the desire number
_, err := sg.Scale(ctx, SScalingTriggerDesc{guestName}, SScalingActionDesc{})
err := sg.Scale(ctx, SScalingTriggerDesc{guestName}, SScalingActionDesc{}, 0)
if err != nil {
log.Errorf("ScalingGroup '%s' scale after removing instance '%s' failed: %s", sg.GetId(), guestId, err.Error())
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/controller/autoscaling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,15 @@ func (asc *SASController) CreateInstances(
instanceMap[instane.ID] = instane
}
// check all server's status
go asc.checkAllServer(session, guestIds, retChan)
var waitLimit, waitinterval time.Duration
if sg.Hypervisor == compute.HYPERVISOR_KVM {
waitLimit = 5 * time.Minute
waitinterval = 3 * time.Second
} else {
waitLimit = 10 * time.Minute
waitinterval = 10 * time.Second
}
go asc.checkAllServer(session, guestIds, retChan, waitLimit, waitinterval)

// fourth stage: bind lb and db
failRecord := &SFailRecord{
Expand Down Expand Up @@ -558,10 +566,11 @@ type SCreateRet struct {
Status string
}

func (asc *SASController) checkAllServer(session *mcclient.ClientSession, guestIds []string, retChan chan SCreateRet) {
func (asc *SASController) checkAllServer(session *mcclient.ClientSession, guestIds []string, retChan chan SCreateRet,
waitLimit, waitInterval time.Duration) {
guestIDSet := sets.NewString(guestIds...)
ticker := time.NewTicker(3 * time.Second)
timer := time.NewTimer(5 * time.Minute)
timer := time.NewTimer(waitLimit)
ticker := time.NewTicker(waitInterval)
defer func() {
close(retChan)
ticker.Stop()
Expand Down

0 comments on commit 4d8e0ba

Please sign in to comment.