Skip to content

Commit

Permalink
mcs, tso: add API interface to obtain the TSO keyspace group member i…
Browse files Browse the repository at this point in the history
…nfo (tikv#6373)

ref tikv#6232

Add API interface to obtain the TSO keyspace group member info.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent e949c32 commit 2f00984
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 18 deletions.
70 changes: 55 additions & 15 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import (
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
"go.uber.org/zap"
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/tso/api/v1"
const (
// APIPathPrefix is the prefix of the API path.
APIPathPrefix = "/tso/api/v1"
)

var (
once sync.Once
Expand All @@ -46,14 +52,14 @@ var (
func init() {
tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) {
s := NewService(srv)
return s.handler(), apiServiceGroup
return s.apiHandlerEngine, apiServiceGroup
}
}

// Service is the tso service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup
root *gin.RouterGroup

srv *tsoserver.Service
rd *render.Render
Expand All @@ -77,30 +83,64 @@ func NewService(srv *tsoserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set("service", srv)
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
endpoint := apiHandlerEngine.Group(APIPathPrefix)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
srv: srv,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
root: root,
rd: createIndentRender(),
}
s.RegisterRouter()
s.RegisterAdminRouter()
s.RegisterKeyspaceGroupRouter()
return s
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd)
s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
router.POST("/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
}

func (s *Service) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.apiHandlerEngine.ServeHTTP(w, r)
})
// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
func (s *Service) RegisterKeyspaceGroupRouter() {
router := s.root.Group("keyspace-groups")
router.GET("/members", GetKeyspaceGroupMembers)
}

// KeyspaceGroupMember contains the keyspace group and its member information.
type KeyspaceGroupMember struct {
Group *endpoint.KeyspaceGroup
Member *tsopb.Participant
IsPrimary bool `json:"is_primary"`
PrimaryID uint64 `json:"primary_id"`
}

// GetKeyspaceGroupMembers gets the keyspace group members that the TSO service is serving.
func GetKeyspaceGroupMembers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
kgm := svr.GetKeyspaceGroupManager()
keyspaceGroups := kgm.GetKeyspaceGroups()
members := make(map[uint32]*KeyspaceGroupMember, len(keyspaceGroups))
for id, group := range keyspaceGroups {
am, err := kgm.GetAllocatorManager(id)
if err != nil {
log.Error("failed to get allocator manager",
zap.Uint32("keyspace-group-id", id), zap.Error(err))
continue
}
member := am.GetMember()
members[id] = &KeyspaceGroupMember{
Group: group,
Member: member.GetMember().(*tsopb.Participant),
IsPrimary: member.IsLeader(),
PrimaryID: member.GetLeaderID(),
}
}
c.IndentedJSON(http.StatusOK, members)
}
5 changes: 5 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (s *Server) IsClosed() bool {
return atomic.LoadInt64(&s.isRunning) == 0
}

// GetKeyspaceGroupManager returns the manager of keyspace group.
func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager {
return s.keyspaceGroupManager
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
Expand Down
14 changes: 14 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
return am.GetMember(), nil
}

// GetKeyspaceGroups returns all keyspace groups managed by the current keyspace group manager.
func (kgm *KeyspaceGroupManager) GetKeyspaceGroups() map[uint32]*endpoint.KeyspaceGroup {
kgm.RLock()
defer kgm.RUnlock()
keyspaceGroups := make(map[uint32]*endpoint.KeyspaceGroup)
for _, keyspaceGroupID := range kgm.keyspaceLookupTable {
if _, ok := keyspaceGroups[keyspaceGroupID]; ok {
continue
}
keyspaceGroups[keyspaceGroupID] = kgm.kgs[keyspaceGroupID]
}
return keyspaceGroups
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group.
func (kgm *KeyspaceGroupManager) HandleTSORequest(
keyspaceID, keyspaceGroupID uint32,
Expand Down
9 changes: 6 additions & 3 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"go.uber.org/zap"
)

// HTTP headers.
const (
// ServiceAllowDirectHandle is the header key to allow direct handle.
ServiceAllowDirectHandle = "service-allow-direct-handle"
ServiceRedirectorHeader = "service-redirector"
// ServiceRedirectorHeader is the header key to indicate the request is redirected.
ServiceRedirectorHeader = "service-redirector"
// ServiceContextKey is the key to get service server from gin.Context.
ServiceContextKey = "service"
)

// ServiceRedirector is a middleware to redirect the request to the right place.
func ServiceRedirector() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet("service").(bs.Server)
svr := c.MustGet(ServiceContextKey).(bs.Server)
allowDirectHandle := len(c.Request.Header.Get(ServiceAllowDirectHandle)) > 0
isServing := svr.IsServing()
if allowDirectHandle || isServing {
Expand Down
106 changes: 106 additions & 0 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"encoding/json"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

const (
tsoKeyspaceGroupsPrefix = "/tso/api/v1/keyspace-groups"
)

// dialClient used to dial http request.
var dialClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}

type tsoAPITestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
}

func TestTSOAPI(t *testing.T) {
suite.Run(t, new(tsoAPITestSuite))
}

func (suite *tsoAPITestSuite) SetupTest() {
re := suite.Require()

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.pdCluster, err = tests.NewTestAPICluster(suite.ctx, 1)
re.NoError(err)
err = suite.pdCluster.RunInitialServers()
re.NoError(err)
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
re.NoError(err)
}

func (suite *tsoAPITestSuite) TearDownTest() {
suite.cancel()
suite.tsoCluster.Destroy()
suite.pdCluster.Destroy()
}

func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() {
re := suite.Require()

primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
members := mustGetKeyspaceGroupMembers(re, primary)
re.Len(members, 1)
defaultGroupMember := members[mcsutils.DefaultKeyspaceGroupID]
re.NotNil(defaultGroupMember)
re.Equal(mcsutils.DefaultKeyspaceGroupID, defaultGroupMember.Group.ID)
re.True(defaultGroupMember.IsPrimary)
primaryMember, err := primary.GetMember(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID)
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil)
re.NoError(err)
httpResp, err := dialClient.Do(httpReq)
re.NoError(err)
defer httpResp.Body.Close()
data, err := io.ReadAll(httpResp.Body)
re.NoError(err)
re.Equal(http.StatusOK, httpResp.StatusCode, string(data))
var resp map[uint32]*apis.KeyspaceGroupMember
re.NoError(json.Unmarshal(data, &resp))
return resp
}
1 change: 1 addition & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
// Ignore the errors caused by the split and context cancellation.
if strings.Contains(errMsg, "context canceled") ||
strings.Contains(errMsg, "not leader") ||
strings.Contains(errMsg, "not served") ||
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") {
continue
}
Expand Down

0 comments on commit 2f00984

Please sign in to comment.