Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Introduce the service registry #5779

Merged
merged 3 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/autoscaling/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const autoScalingPrefix = "/autoscaling"

var (
autoscalingServiceGroup = server.ServiceGroup{
autoscalingServiceGroup = server.APIServiceGroup{
Name: "autoscaling",
Version: "v1alpha",
IsCore: false,
Expand All @@ -36,7 +36,7 @@ var (
)

// NewHandler creates a HTTP handler for auto scaling.
func NewHandler(_ context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
func NewHandler(_ context.Context, svr *server.Server) (http.Handler, server.APIServiceGroup, error) {
autoScalingHandler := http.NewServeMux()
rd := render.New(render.Options{
IndentJSON: true,
Expand Down
8 changes: 4 additions & 4 deletions pkg/dashboard/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ import (
)

var (
apiServiceGroup = server.ServiceGroup{
apiServiceGroup = server.APIServiceGroup{
Name: "dashboard-api",
Version: "v1",
IsCore: false,
PathPrefix: config.APIPathPrefix,
}

uiServiceGroup = server.ServiceGroup{
uiServiceGroup = server.APIServiceGroup{
Name: "dashboard-ui",
Version: "v1",
IsCore: false,
Expand All @@ -68,7 +68,7 @@ func GetServiceBuilders() []server.HandlerBuilder {
// The order of execution must be sequential.
return []server.HandlerBuilder{
// Dashboard API Service
func(ctx context.Context, srv *server.Server) (http.Handler, server.ServiceGroup, error) {
func(ctx context.Context, srv *server.Server) (http.Handler, server.APIServiceGroup, error) {
distroutil.MustLoadAndReplaceStrings()

if cfg, err = adapter.GenDashboardConfig(srv); err != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func GetServiceBuilders() []server.HandlerBuilder {
return apiserver.Handler(s), apiServiceGroup, nil
},
// Dashboard UI
func(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
func(context.Context, *server.Server) (http.Handler, server.APIServiceGroup, error) {
if err != nil {
return nil, uiServiceGroup, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/dashboard/without_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
serviceGroup = server.ServiceGroup{
serviceGroup = server.APIServiceGroup{
Name: "dashboard",
Version: "v1",
IsCore: false,
Expand All @@ -43,7 +43,7 @@ func SetCheckInterval(time.Duration) {}
// GetServiceBuilders returns a empty Dashboard Builder
func GetServiceBuilders() []server.HandlerBuilder {
return []server.HandlerBuilder{
func(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
func(context.Context, *server.Server) (http.Handler, server.APIServiceGroup, error) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.WriteString(w, "Dashboard is not built.\n")
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/msc/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package mcs used to implement the core logic of the external services which rely on the PD banckend provider.
package mcs
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
92 changes: 92 additions & 0 deletions pkg/msc/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package registry is used to register the services.
// TODO: Remove the `pd/server“ dependencies
// TODO: Use `uber/fx` to manage the lifecycle of services.
package registry

import (
"net/http"

"github.com/pingcap/log"
"github.com/tikv/pd/server"
"go.uber.org/zap"
"google.golang.org/grpc"
)

var (
// ServerServiceRegistry is the global grpc service registry.
ServerServiceRegistry = newServiceRegistry()
)

// ServiceBuilder is a function that creates a grpc service.
type ServiceBuilder func(*server.Server) RegistrableService

// RegistrableService is the interface that should wraps the RegisterService method.
type RegistrableService interface {
RegisterGRPCService(g *grpc.Server)
RegisterRESTHandler(userDefineHandlers map[string]http.Handler)
}

// ServiceRegistry is a map that stores all registered grpc services.
// It implements the `server.Serviceregistry` interface.
type ServiceRegistry struct {
builders map[string]ServiceBuilder
services map[string]RegistrableService
}

func newServiceRegistry() *ServiceRegistry {
return &ServiceRegistry{
builders: make(map[string]ServiceBuilder),
services: make(map[string]RegistrableService),
}
}

// InstallAllGRPCServices installs all registered grpc services.
func (r *ServiceRegistry) InstallAllGRPCServices(srv *server.Server, g *grpc.Server) {
for name, builder := range r.builders {
if l, ok := r.services[name]; ok {
l.RegisterGRPCService(g)
nolouch marked this conversation as resolved.
Show resolved Hide resolved
continue
}
l := builder(srv)
l.RegisterGRPCService(g)
log.Info("grpc service registered", zap.String("service-name", name))
}
}

// InstallAllRESTHandler installs all registered REST services.
func (r *ServiceRegistry) InstallAllRESTHandler(srv *server.Server, h map[string]http.Handler) {
for name, builder := range r.builders {
if l, ok := r.services[name]; ok {
l.RegisterRESTHandler(h)
continue
}
l := builder(srv)
l.RegisterRESTHandler(h)
log.Info("restful API service registered", zap.String("service-name", name))
}
}

// RegisterService registers a grpc service.
func (r ServiceRegistry) RegisterService(name string, service ServiceBuilder) {
r.builders[name] = service
}

func init() {
server.NewServiceregistry = func() server.Serviceregistry {
return ServerServiceRegistry
}
}
4 changes: 2 additions & 2 deletions pkg/swaggerserver/swaggerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const swaggerPrefix = "/swagger/"

var (
swaggerServiceGroup = server.ServiceGroup{
swaggerServiceGroup = server.APIServiceGroup{
Name: "swagger",
Version: "v1",
IsCore: false,
Expand All @@ -33,7 +33,7 @@ var (
)

// NewHandler creates a HTTP handler for Swagger.
func NewHandler(context.Context, *server.Server) (http.Handler, server.ServiceGroup, error) {
func NewHandler(context.Context, *server.Server) (http.Handler, server.APIServiceGroup, error) {
swaggerHandler := http.NewServeMux()
swaggerHandler.Handle(swaggerPrefix, handler())
return swaggerHandler, swaggerServiceGroup, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ const (

type runtimeServiceValidator struct {
s *server.Server
group server.ServiceGroup
group server.APIServiceGroup
}

// NewRuntimeServiceValidator checks if the path is invalid.
func NewRuntimeServiceValidator(s *server.Server, group server.ServiceGroup) negroni.Handler {
func NewRuntimeServiceValidator(s *server.Server, group server.APIServiceGroup) negroni.Handler {
return &runtimeServiceValidator{s: s, group: group}
}

Expand All @@ -58,7 +58,7 @@ func (h *runtimeServiceValidator) ServeHTTP(w http.ResponseWriter, r *http.Reque
}

// IsServiceAllowed checks the service through the path.
func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool {
func IsServiceAllowed(s *server.Server, group server.APIServiceGroup) bool {
// for core path
if group.IsCore {
return true
Expand Down
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
const apiPrefix = "/pd"

// NewHandler creates a HTTP handler for API.
func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
group := server.ServiceGroup{
func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.APIServiceGroup, error) {
group := server.APIServiceGroup{
Name: "core",
IsCore: true,
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

var once sync.Once

var group = server.ServiceGroup{
var group = server.APIServiceGroup{
Name: "core",
IsCore: true,
Version: "v2",
Expand All @@ -37,7 +37,7 @@ var group = server.ServiceGroup{
const apiV2Prefix = "/pd/api/v2/"

// NewV2Handler creates a HTTP handler for API.
func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, server.APIServiceGroup, error) {
once.Do(func() {
// See https://github.com/pingcap/tidb-dashboard/blob/f8ecb64e3d63f4ed91c3dca7a04362418ade01d8/pkg/apiserver/apiserver.go#L84
// These global modification will be effective only for the first invoke.
Expand Down
77 changes: 12 additions & 65 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ import (
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/server/tso"
"github.com/urfave/negroni"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/types"
Expand Down Expand Up @@ -173,15 +172,7 @@ type Server struct {
}

// HandlerBuilder builds a server HTTP handler.
type HandlerBuilder func(context.Context, *Server) (http.Handler, ServiceGroup, error)

// ServiceGroup used to register the service.
type ServiceGroup struct {
Name string
Version string
IsCore bool
PathPrefix string
}
type HandlerBuilder func(context.Context, *Server) (http.Handler, APIServiceGroup, error)

const (
// CorePath the core group, is at REST path `/pd/api/v1`.
Expand All @@ -190,60 +181,8 @@ const (
ExtensionsPath = "/pd/apis"
)

func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBuilders ...HandlerBuilder) (map[string]http.Handler, error) {
userHandlers := make(map[string]http.Handler)
registerMap := make(map[string]struct{})

apiService := negroni.New()
recovery := negroni.NewRecovery()
apiService.Use(recovery)
router := mux.NewRouter()

for _, build := range serviceBuilders {
handler, info, err := build(ctx, svr)
if err != nil {
return nil, err
}
if !info.IsCore && len(info.PathPrefix) == 0 && (len(info.Name) == 0 || len(info.Version) == 0) {
return nil, errs.ErrAPIInformationInvalid.FastGenByArgs(info.Name, info.Version)
}
var pathPrefix string
if len(info.PathPrefix) != 0 {
pathPrefix = info.PathPrefix
} else if info.IsCore {
pathPrefix = CorePath
} else {
pathPrefix = path.Join(ExtensionsPath, info.Name, info.Version)
}
if _, ok := registerMap[pathPrefix]; ok {
return nil, errs.ErrServiceRegistered.FastGenByArgs(pathPrefix)
}

log.Info("register REST path", zap.String("path", pathPrefix))
registerMap[pathPrefix] = struct{}{}
if len(info.PathPrefix) != 0 {
// If PathPrefix is specified, register directly into userHandlers
userHandlers[pathPrefix] = handler
} else {
// If PathPrefix is not specified, register into apiService,
// and finally apiService is registered in userHandlers.
router.PathPrefix(pathPrefix).Handler(handler)
if info.IsCore {
// Deprecated
router.Path("/pd/health").Handler(handler)
// Deprecated
router.Path("/pd/ping").Handler(handler)
}
}
}
apiService.UseHandler(router)
userHandlers[pdAPIPrefix] = apiService

return userHandlers, nil
}

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...HandlerBuilder) (*Server, error) {
func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
log.Info("PD Config", zap.Reflect("config", cfg))
rand.Seed(time.Now().UnixNano())
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()
Expand Down Expand Up @@ -276,18 +215,26 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha
if err != nil {
return nil, err
}
if len(serviceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx, s, serviceBuilders...)
if len(legacyServiceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx, s, legacyServiceBuilders...)
if err != nil {
return nil, err
}
etcdCfg.UserHandlers = userHandlers
}
// New way to register services.
registry := NewServiceregistry()

// Register the micro services REST path.
registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)

etcdCfg.ServiceRegister = func(gs *grpc.Server) {
grpcServer := &GrpcServer{Server: s}
pdpb.RegisterPDServer(gs, grpcServer)
keyspacepb.RegisterKeyspaceServer(gs, &KeyspaceServer{GrpcServer: grpcServer})
diagnosticspb.RegisterDiagnosticsServer(gs, s)
// Register the micro services GRPC service.
NewServiceregistry().InstallAllGRPCServices(s, gs)
}
s.etcdCfg = etcdCfg
s.lg = cfg.GetZapLogger()
Expand Down