Skip to content

Commit

Permalink
*: Introduce the service registry (#5779)
Browse files Browse the repository at this point in the history
ref #5766

Introduce the service registry:
- Provide a new way to initial external service
- Supports gRPC and HTTP API both

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Dec 14, 2022
1 parent 84c0f82 commit 6ada022
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 92 deletions.
4 changes: 2 additions & 2 deletions pkg/autoscaling/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const autoScalingPrefix = "/autoscaling"

var (
autoscalingServiceGroup = server.ServiceGroup{
autoscalingServiceGroup = server.APIServiceGroup{
Name: "autoscaling",
Version: "v1alpha",
IsCore: false,
Expand All @@ -36,7 +36,7 @@ var (
)

// NewHandler creates a HTTP handler for auto scaling.
func NewHandler(_ context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
func NewHandler(_ context.Context, svr *server.Server) (http.Handler, server.APIServiceGroup, error) {
autoScalingHandler := http.NewServeMux()
rd := render.New(render.Options{
IndentJSON: true,
Expand Down
8 changes: 4 additions & 4 deletions pkg/dashboard/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ import (
)

var (
apiServiceGroup = server.ServiceGroup{
apiServiceGroup = server.APIServiceGroup{
Name: "dashboard-api",
Version: "v1",
IsCore: false,
PathPrefix: config.APIPathPrefix,
}

uiServiceGroup = server.ServiceGroup{
uiServiceGroup = server.APIServiceGroup{
Name: "dashboard-ui",
Version: "v1",
IsCore: false,
Expand All @@ -68,7 +68,7 @@ func GetServiceBuilders() []server.HandlerBuilder {
// The order of execution must be sequential.
return []server.HandlerBuilder{
// Dashboard API Service
func(ctx context.Context, srv *server.Server) (http.Handler, server.ServiceGroup, error) {
func(ctx context.Context, srv *server.Server) (http.Handler, server.APIServiceGroup, error) {
distroutil.MustLoadAndReplaceStrings()

if cfg, err = adapter.GenDashboardConfig(srv); err != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func GetServiceBuilders() []server.HandlerBuilder {
return apiserver.Handler(s), apiServiceGroup, nil
},
// Dashboard UI
func(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
func(context.Context, *server.Server) (http.Handler, server.APIServiceGroup, error) {
if err != nil {
return nil, uiServiceGroup, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/dashboard/without_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
serviceGroup = server.ServiceGroup{
serviceGroup = server.APIServiceGroup{
Name: "dashboard",
Version: "v1",
IsCore: false,
Expand All @@ -43,7 +43,7 @@ func SetCheckInterval(time.Duration) {}
// GetServiceBuilders returns a empty Dashboard Builder
func GetServiceBuilders() []server.HandlerBuilder {
return []server.HandlerBuilder{
func(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
func(context.Context, *server.Server) (http.Handler, server.APIServiceGroup, error) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.WriteString(w, "Dashboard is not built.\n")
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/msc/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package mcs used to implement the core logic of the external services which rely on the PD banckend provider.
package mcs
94 changes: 94 additions & 0 deletions pkg/msc/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package registry is used to register the services.
// TODO: Remove the `pd/server` dependencies
// TODO: Use the `uber/fx` to manage the lifecycle of services.
package registry

import (
"net/http"

"github.com/pingcap/log"
"github.com/tikv/pd/server"
"go.uber.org/zap"
"google.golang.org/grpc"
)

var (
// ServerServiceRegistry is the global grpc service registry.
ServerServiceRegistry = newServiceRegistry()
)

// ServiceBuilder is a function that creates a grpc service.
type ServiceBuilder func(*server.Server) RegistrableService

// RegistrableService is the interface that should wraps the RegisterService method.
type RegistrableService interface {
RegisterGRPCService(g *grpc.Server)
RegisterRESTHandler(userDefineHandlers map[string]http.Handler)
}

// ServiceRegistry is a map that stores all registered grpc services.
// It implements the `Serviceregistry` interface.
type ServiceRegistry struct {
builders map[string]ServiceBuilder
services map[string]RegistrableService
}

func newServiceRegistry() *ServiceRegistry {
return &ServiceRegistry{
builders: make(map[string]ServiceBuilder),
services: make(map[string]RegistrableService),
}
}

// InstallAllGRPCServices installs all registered grpc services.
func (r *ServiceRegistry) InstallAllGRPCServices(srv *server.Server, g *grpc.Server) {
for name, builder := range r.builders {
if l, ok := r.services[name]; ok {
l.RegisterGRPCService(g)
log.Info("gRPC service already registered", zap.String("service-name", name))
continue
}
l := builder(srv)
l.RegisterGRPCService(g)
log.Info("gRPC service register success", zap.String("service-name", name))
}
}

// InstallAllRESTHandler installs all registered REST services.
func (r *ServiceRegistry) InstallAllRESTHandler(srv *server.Server, h map[string]http.Handler) {
for name, builder := range r.builders {
if l, ok := r.services[name]; ok {
l.RegisterRESTHandler(h)
log.Info("restful API service already registered", zap.String("service-name", name))
continue
}
l := builder(srv)
l.RegisterRESTHandler(h)
log.Info("restful API service register success", zap.String("service-name", name))
}
}

// RegisterService registers a grpc service.
func (r ServiceRegistry) RegisterService(name string, service ServiceBuilder) {
r.builders[name] = service
}

func init() {
server.NewServiceregistry = func() server.Serviceregistry {
return ServerServiceRegistry
}
}
4 changes: 2 additions & 2 deletions pkg/swaggerserver/swaggerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const swaggerPrefix = "/swagger/"

var (
swaggerServiceGroup = server.ServiceGroup{
swaggerServiceGroup = server.APIServiceGroup{
Name: "swagger",
Version: "v1",
IsCore: false,
Expand All @@ -33,7 +33,7 @@ var (
)

// NewHandler creates a HTTP handler for Swagger.
func NewHandler(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
func NewHandler(context.Context, *server.Server) (http.Handler, server.APIServiceGroup, error) {
swaggerHandler := http.NewServeMux()
swaggerHandler.Handle(swaggerPrefix, handler())
return swaggerHandler, swaggerServiceGroup, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ const (

type runtimeServiceValidator struct {
s *server.Server
group server.ServiceGroup
group server.APIServiceGroup
}

// NewRuntimeServiceValidator checks if the path is invalid.
func NewRuntimeServiceValidator(s *server.Server, group server.ServiceGroup) negroni.Handler {
func NewRuntimeServiceValidator(s *server.Server, group server.APIServiceGroup) negroni.Handler {
return &runtimeServiceValidator{s: s, group: group}
}

Expand All @@ -58,7 +58,7 @@ func (h *runtimeServiceValidator) ServeHTTP(w http.ResponseWriter, r *http.Reque
}

// IsServiceAllowed checks the service through the path.
func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool {
func IsServiceAllowed(s *server.Server, group server.APIServiceGroup) bool {
// for core path
if group.IsCore {
return true
Expand Down
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
const apiPrefix = "/pd"

// NewHandler creates a HTTP handler for API.
func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
group := server.ServiceGroup{
func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.APIServiceGroup, error) {
group := server.APIServiceGroup{
Name: "core",
IsCore: true,
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

var once sync.Once

var group = server.ServiceGroup{
var group = server.APIServiceGroup{
Name: "core",
IsCore: true,
Version: "v2",
Expand All @@ -37,7 +37,7 @@ var group = server.ServiceGroup{
const apiV2Prefix = "/pd/api/v2/"

// NewV2Handler creates a HTTP handler for API.
func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, server.APIServiceGroup, error) {
once.Do(func() {
// See https://github.com/pingcap/tidb-dashboard/blob/f8ecb64e3d63f4ed91c3dca7a04362418ade01d8/pkg/apiserver/apiserver.go#L84
// These global modification will be effective only for the first invoke.
Expand Down
77 changes: 12 additions & 65 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ import (
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/server/tso"
"github.com/urfave/negroni"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/types"
Expand Down Expand Up @@ -173,15 +172,7 @@ type Server struct {
}

// HandlerBuilder builds a server HTTP handler.
type HandlerBuilder func(context.Context, *Server) (http.Handler, ServiceGroup, error)

// ServiceGroup used to register the service.
type ServiceGroup struct {
Name string
Version string
IsCore bool
PathPrefix string
}
type HandlerBuilder func(context.Context, *Server) (http.Handler, APIServiceGroup, error)

const (
// CorePath the core group, is at REST path `/pd/api/v1`.
Expand All @@ -190,60 +181,8 @@ const (
ExtensionsPath = "/pd/apis"
)

func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBuilders ...HandlerBuilder) (map[string]http.Handler, error) {
userHandlers := make(map[string]http.Handler)
registerMap := make(map[string]struct{})

apiService := negroni.New()
recovery := negroni.NewRecovery()
apiService.Use(recovery)
router := mux.NewRouter()

for _, build := range serviceBuilders {
handler, info, err := build(ctx, svr)
if err != nil {
return nil, err
}
if !info.IsCore && len(info.PathPrefix) == 0 && (len(info.Name) == 0 || len(info.Version) == 0) {
return nil, errs.ErrAPIInformationInvalid.FastGenByArgs(info.Name, info.Version)
}
var pathPrefix string
if len(info.PathPrefix) != 0 {
pathPrefix = info.PathPrefix
} else if info.IsCore {
pathPrefix = CorePath
} else {
pathPrefix = path.Join(ExtensionsPath, info.Name, info.Version)
}
if _, ok := registerMap[pathPrefix]; ok {
return nil, errs.ErrServiceRegistered.FastGenByArgs(pathPrefix)
}

log.Info("register REST path", zap.String("path", pathPrefix))
registerMap[pathPrefix] = struct{}{}
if len(info.PathPrefix) != 0 {
// If PathPrefix is specified, register directly into userHandlers
userHandlers[pathPrefix] = handler
} else {
// If PathPrefix is not specified, register into apiService,
// and finally apiService is registered in userHandlers.
router.PathPrefix(pathPrefix).Handler(handler)
if info.IsCore {
// Deprecated
router.Path("/pd/health").Handler(handler)
// Deprecated
router.Path("/pd/ping").Handler(handler)
}
}
}
apiService.UseHandler(router)
userHandlers[pdAPIPrefix] = apiService

return userHandlers, nil
}

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...HandlerBuilder) (*Server, error) {
func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
log.Info("PD Config", zap.Reflect("config", cfg))
rand.Seed(time.Now().UnixNano())
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()
Expand Down Expand Up @@ -276,18 +215,26 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha
if err != nil {
return nil, err
}
if len(serviceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx, s, serviceBuilders...)
if len(legacyServiceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx, s, legacyServiceBuilders...)
if err != nil {
return nil, err
}
etcdCfg.UserHandlers = userHandlers
}
// New way to register services.
registry := NewServiceregistry()

// Register the micro services REST path.
registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)

etcdCfg.ServiceRegister = func(gs *grpc.Server) {
grpcServer := &GrpcServer{Server: s}
pdpb.RegisterPDServer(gs, grpcServer)
keyspacepb.RegisterKeyspaceServer(gs, &KeyspaceServer{GrpcServer: grpcServer})
diagnosticspb.RegisterDiagnosticsServer(gs, s)
// Register the micro services GRPC service.
NewServiceregistry().InstallAllGRPCServices(s, gs)
}
s.etcdCfg = etcdCfg
s.lg = cfg.GetZapLogger()
Expand Down

0 comments on commit 6ada022

Please sign in to comment.