Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdms: optimize upgrade pdms to avoid unnecessary primary transfer #2414

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,19 @@ func Upgrade(
if instance.IgnoreMonitorAgent() {
noAgentHosts.Insert(instance.GetManageHost())
}

// Usage within the switch statement
switch component.Name() {
case spec.ComponentPD:
// defer PD leader to be upgraded after others
isLeader, err := instance.(*spec.PDInstance).IsLeader(ctx, topo, int(options.APITimeout), tlsCfg)
case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling:
// defer PD related leader/primary to be upgraded after others
isLeader, err := checkAndDeferPDLeader(ctx, topo, int(options.APITimeout), tlsCfg, instance)
if err != nil {
logger.Warnf("cannot found pd leader, ignore: %s", err)
logger.Warnf("cannot found pd related leader/primary, ignore: %s, instance: %s", err, instance.ID())
return err
}
if isLeader {
deferInstances = append(deferInstances, instance)
logger.Debugf("Deferred upgrading of PD leader %s", instance.ID())
logger.Debugf("Upgrading deferred instance %s...", instance.ID())
continue
}
case spec.ComponentCDC:
Expand Down Expand Up @@ -218,6 +220,22 @@ func Upgrade(
return RestartMonitored(ctx, uniqueHosts.Slice(), noAgentHosts, topo.GetMonitoredOptions(), options.OptTimeout, systemdMode)
}

// checkAndDeferPDLeader checks the PD related leader/primary instance's status and defers its upgrade if necessary.
func checkAndDeferPDLeader(ctx context.Context, topo spec.Topology, apiTimeout int, tlsCfg *tls.Config, instance spec.Instance) (isLeader bool, err error) {
switch instance.ComponentName() {
case spec.ComponentPD:
isLeader, err = instance.(*spec.PDInstance).IsLeader(ctx, topo, apiTimeout, tlsCfg)
case spec.ComponentScheduling:
isLeader, err = instance.(*spec.SchedulingInstance).IsPrimary(ctx, topo, tlsCfg)
case spec.ComponentTSO:
isLeader, err = instance.(*spec.TSOInstance).IsPrimary(ctx, topo, tlsCfg)
}
if err != nil {
return false, err
}
return isLeader, nil
}

func upgradeInstance(
ctx context.Context,
topo spec.Topology,
Expand Down
26 changes: 25 additions & 1 deletion pkg/cluster/spec/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/pingcap/tiup/pkg/utils"
)

var schedulingService = "scheduling"

// SchedulingSpec represents the scheduling topology specification in topology.yaml
type SchedulingSpec struct {
Host string `yaml:"host"`
Expand Down Expand Up @@ -66,7 +69,7 @@ func (s *SchedulingSpec) Status(ctx context.Context, timeout time.Duration, tlsC
return "Down"
}

primary, err := pc.GetServicePrimary("scheduling")
primary, err := pc.GetServicePrimary(schedulingService)
if err != nil {
return "ERR"
}
Expand Down Expand Up @@ -309,6 +312,27 @@ func (i *SchedulingInstance) setTLSConfig(ctx context.Context, enableTLS bool, c
return configs, nil
}

// IsPrimary checks if the instance is primary
func (i *SchedulingInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) {
tidbTopo, ok := topo.(*Specification)
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg)
primary, err := pdClient.GetServicePrimary(schedulingService)
if err != nil {
return false, errors.Annotatef(err, "failed to get Scheduling primary %s", i.GetHost())
}

spec := i.InstanceSpec.(*SchedulingSpec)
enableTLS := false
if tlsCfg != nil {
enableTLS = true
}

return primary == spec.GetAdvertiseListenURL(enableTLS), nil
}

// ScaleConfig deploy temporary config on scaling
func (i *SchedulingInstance) ScaleConfig(
ctx context.Context,
Expand Down
26 changes: 25 additions & 1 deletion pkg/cluster/spec/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/pingcap/tiup/pkg/utils"
)

var tsoService = "tso"

// TSOSpec represents the TSO topology specification in topology.yaml
type TSOSpec struct {
Host string `yaml:"host"`
Expand Down Expand Up @@ -66,7 +69,7 @@ func (s *TSOSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls
return "Down"
}

primary, err := pc.GetServicePrimary("tso")
primary, err := pc.GetServicePrimary(tsoService)
if err != nil {
return "ERR"
}
Expand Down Expand Up @@ -309,6 +312,27 @@ func (i *TSOInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs
return configs, nil
}

// IsPrimary checks if the instance is primary
func (i *TSOInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) {
tidbTopo, ok := topo.(*Specification)
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg)
primary, err := pdClient.GetServicePrimary(tsoService)
if err != nil {
return false, errors.Annotatef(err, "failed to get TSO primary %s", i.GetHost())
}

spec := i.InstanceSpec.(*TSOSpec)
enableTLS := false
if tlsCfg != nil {
enableTLS = true
}

return primary == spec.GetAdvertiseListenURL(enableTLS), nil
}

// ScaleConfig deploy temporary config on scaling
func (i *TSOInstance) ScaleConfig(
ctx context.Context,
Expand Down
Loading