Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdms: support primary/transfer api for scheduling and tso #8148

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (ls *Leadership) getLease() *lease {
return l.(*lease)
}

func (ls *Leadership) setLease(lease *lease) {
func (ls *Leadership) SetLease(lease *lease) {
ls.lease.Store(lease)
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
client: ls.client,
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)
ls.SetLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
var member pdpb.Member
Expand Down
8 changes: 8 additions & 0 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@
expireTime atomic.Value
}

func NewLease(client *clientv3.Client, purpose string) *lease {
return &lease{
Purpose: purpose,
client: client,
lease: clientv3.NewLease(client),

Check warning on line 55 in pkg/election/lease.go

View check run for this annotation

Codecov / codecov/patch

pkg/election/lease.go#L51-L55

Added lines #L51 - L55 were not covered by tests
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *lease) Grant(leaseTimeout int64) error {
if l == nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
}

// GetMSMembers returns all the members of the specified service name.
func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch name {
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand All @@ -75,5 +75,5 @@
return entries, nil
}

return nil, errors.Errorf("unknown service name %s", name)
return nil, errors.Errorf("unknown service name %s", serviceName)

Check warning on line 78 in pkg/mcs/discovery/discover.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L78

Added line #L78 was not covered by tests
}
54 changes: 53 additions & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
Expand All @@ -39,11 +40,14 @@
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/unrolled/render"
"go.etcd.io/etcd/clientv3"
)

// APIPathPrefix is the prefix of the API path.
Expand Down Expand Up @@ -112,6 +116,7 @@
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterMemberRouter()
s.RegisterConfigRouter()
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
Expand All @@ -130,7 +135,13 @@
router.DELETE("cache/regions/:id", deleteRegionCacheByID)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
// RegisterMemberRouter registers the router of the member handler.
func (s *Service) RegisterMemberRouter() {
router := s.root.Group("member")
router.POST("/primary/transfer", transferPrimary)
}

// RegisterSchedulersRouter registers the router of the schedulers' handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
Expand Down Expand Up @@ -259,6 +270,47 @@
c.IndentedJSON(http.StatusOK, cfg)
}

func transferPrimary(c *gin.Context) {
if len(c.Request.Header.Get(multiservicesapi.ServiceAllowDirectHandle)) == 0 {
c.AbortWithStatusJSON(http.StatusInternalServerError, "please add `service-allow-direct-handle` in header")
return

Check warning on line 276 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L273-L276

Added lines #L273 - L276 were not covered by tests
}

svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
if svr.IsServing() {
c.AbortWithStatusJSON(http.StatusInternalServerError, "now is primary")
return

Check warning on line 282 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L279-L282

Added lines #L279 - L282 were not covered by tests
}

newLease := election.NewLease(svr.GetClient(), "primary election")
if err := newLease.Grant(mcsutils.DefaultLeaderLease); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "newLease grant error")

Check warning on line 287 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L285-L287

Added lines #L285 - L287 were not covered by tests
}

// delete previous primary firstly
primaryKey := endpoint.SchedulingPrimaryPath(svr.GetClusterID())
deleteResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpDelete(primaryKey),
).Commit()
if err != nil || !deleteResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "delete resp error")

Check warning on line 297 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L291-L297

Added lines #L291 - L297 were not covered by tests
}

memberValue := svr.GetParticipant()
memberValue.GetLeadership().SetLease(newLease)
putResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpPut(primaryKey, memberValue.MemberValue(), clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID))),
).
Commit()
if err != nil || !putResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "put resp error")

Check warning on line 308 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L300-L308

Added lines #L300 - L308 were not covered by tests
}

c.IndentedJSON(http.StatusOK, "transfer submitted!")

Check warning on line 311 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L311

Added line #L311 was not covered by tests
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
54 changes: 46 additions & 8 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@
return s.cfg.BackendEndpoints
}

func (s *Server) GetClusterID() uint64 {
return s.clusterID

Check warning on line 132 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L131-L132

Added lines #L131 - L132 were not covered by tests
}

func (s *Server) GetParticipant() *member.Participant {
return s.participant

Check warning on line 136 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L135-L136

Added lines #L135 - L136 were not covered by tests
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
Expand Down Expand Up @@ -249,18 +257,35 @@

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
} else {
log.Error("campaign scheduling primary meets error due to etcd error",
zap.String("campaign-scheduling-primary-name", s.participant.Name()),
errs.ZapError(err))
leader, _, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("getting the leader meets error", errs.ZapError(err))
return

Check warning on line 263 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L262-L263

Added lines #L262 - L263 were not covered by tests
}
if leader != nil && !s.participant.IsSameLeader(leader) {
leader, ok := leader.(*schedulingpb.Participant)
if !ok {
log.Error("failed to get the leader", zap.Any("leader", leader))
return

Check warning on line 269 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L266-L269

Added lines #L266 - L269 were not covered by tests
}
log.Info("the scheduling primary/leader is already elected", zap.Stringer("scheduling-primary", leader))

Check warning on line 271 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L271

Added line #L271 was not covered by tests
return
}

if leader == nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
} else {
log.Error("campaign scheduling primary meets error due to etcd error",
zap.String("campaign-scheduling-primary-name", s.participant.Name()),
errs.ZapError(err))

Check warning on line 283 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L281-L283

Added lines #L281 - L283 were not covered by tests
}
return
}
}

// Start keepalive the leadership and enable Scheduling service.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
var resetLeaderOnce sync.Once
Expand Down Expand Up @@ -290,6 +315,19 @@
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

go func() {
log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader()))
_, revision, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return

Check warning on line 323 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L322-L323

Added lines #L322 - L323 were not covered by tests
}
// Watch will keep looping and never return unless the primary/leader has changed.
s.participant.GetLeadership().Watch(s.serverLoopCtx, revision)
s.participant.UnsetLeader()
log.Info("[primary] the scheduling primary has changed, try to re-campaign a primary")
}()

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
defer leaderTicker.Stop()

Expand Down
55 changes: 55 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/unrolled/render"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -94,6 +97,7 @@
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterMemberRouter()
s.RegisterKeyspaceGroupRouter()
s.RegisterHealthRouter()
s.RegisterConfigRouter()
Expand All @@ -107,6 +111,12 @@
router.PUT("/log", changeLogLevel)
}

// RegisterMemberRouter registers the router of the member handler.
func (s *Service) RegisterMemberRouter() {
router := s.root.Group("member")
router.POST("/primary/transfer", transferPrimary)
}

// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
func (s *Service) RegisterKeyspaceGroupRouter() {
router := s.root.Group("keyspace-groups")
Expand Down Expand Up @@ -141,6 +151,51 @@
c.String(http.StatusOK, "The log level is updated.")
}

func transferPrimary(c *gin.Context) {
if len(c.Request.Header.Get(multiservicesapi.ServiceAllowDirectHandle)) == 0 {
c.AbortWithStatusJSON(http.StatusInternalServerError, "please add `service-allow-direct-handle` in header")
return

Check warning on line 157 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L154-L157

Added lines #L154 - L157 were not covered by tests
}

svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
if svr.IsServing() {
c.AbortWithStatusJSON(http.StatusInternalServerError, "now is primary")
return

Check warning on line 163 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L160-L163

Added lines #L160 - L163 were not covered by tests
}

newLease := election.NewLease(svr.GetClient(), "transfer-primary")
if err := newLease.Grant(utils.DefaultLeaderLease); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "newLease grant error")

Check warning on line 168 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L166-L168

Added lines #L166 - L168 were not covered by tests
}

// delete previous primary firstly
tsoRootPath := endpoint.TSOSvcRootPath(svr.GetClusterID())
primaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID)
deleteResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpDelete(primaryKey),
).Commit()
if err != nil || !deleteResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "delete resp error")

Check warning on line 179 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L172-L179

Added lines #L172 - L179 were not covered by tests
}

memberValue, err := svr.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
memberValue.GetLeadership().SetLease(newLease)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "get tso member")

Check warning on line 185 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L182-L185

Added lines #L182 - L185 were not covered by tests
}
putResp, err := kv.NewSlowLogTxn(svr.GetClient()).
Then(
clientv3.OpPut(primaryKey, memberValue.MemberValue(), clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID))),
).
Commit()
if err != nil || !putResp.Succeeded {
c.AbortWithStatusJSON(http.StatusInternalServerError, "put resp error")

Check warning on line 193 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L187-L193

Added lines #L187 - L193 were not covered by tests
}

c.IndentedJSON(http.StatusOK, "transfer submitted!")

Check warning on line 196 in pkg/mcs/tso/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/apis/v1/api.go#L196

Added line #L196 was not covered by tests
}

// ResetTSParams is the input json body params of ResetTS
type ResetTSParams struct {
TSO string `json:"tso"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@
s.serverLoopWg.Add(n)
}

func (s *Server) GetClusterID() uint64 {
return s.clusterID

Check warning on line 125 in pkg/mcs/tso/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/server.go#L124-L125

Added lines #L124 - L125 were not covered by tests
}

// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
Expand Down
Loading