Skip to content

Commit

Permalink
fix(core): rollup readiness checks to central health service (#755)
Browse files Browse the repository at this point in the history
This pr introduces the ability for services to register with a
centralized platform health check.

```mermaid
block-beta
columns 1
HC["Platform Health Check"]
space
block:ID
 K["KAS"]
 A["Authorization Service"]
 P["Policy Service"]
end
K --> HC
A --> HC
P --> HC
```

Resolves 
- #700 
- #727 
- #726
  • Loading branch information
strantalis committed May 7, 2024
1 parent 4db49f1 commit 8a65161
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 30 deletions.
18 changes: 17 additions & 1 deletion service/authorization/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func NewRegistration() serviceregistry.Registration {
var clientID = "tdf-authorization-svc"
var clientSecert = "secret"
var tokenEndpoint = "http://localhost:8888/auth/realms/opentdf/protocol/openid-connect/token" //nolint:gosec // default token endpoint

as := &AuthorizationService{eng: srp.Engine, sdk: srp.SDK}
if err := srp.RegisterReadinessCheck("authorization", as.IsReady); err != nil {
slog.Error("failed to register authorization readiness check", slog.String("error", err.Error()))
}

// if its passed in the config use that
val, ok := srp.Config.ExtraProps["ersUrl"]
if ok {
Expand Down Expand Up @@ -79,7 +85,11 @@ func NewRegistration() serviceregistry.Registration {
}
config := clientcredentials.Config{ClientID: clientID, ClientSecret: clientSecert, TokenURL: tokenEndpoint}
newTokenSource := oauth2.ReuseTokenSourceWithExpiry(nil, config.TokenSource(context.Background()), tokenExpiryDelay)
return &AuthorizationService{eng: srp.Engine, sdk: srp.SDK, ersURL: ersURL, tokenSource: &newTokenSource}, func(ctx context.Context, mux *runtime.ServeMux, server any) error {

as.ersURL = ersURL
as.tokenSource = &newTokenSource

return &AuthorizationService{eng: srp.Engine, sdk: srp.SDK}, func(ctx context.Context, mux *runtime.ServeMux, server any) error {
authServer, okAuth := server.(authorization.AuthorizationServiceServer)
if !okAuth {
return fmt.Errorf("failed to assert server type to authorization.AuthorizationServiceServer")
Expand All @@ -90,6 +100,12 @@ func NewRegistration() serviceregistry.Registration {
}
}

// TODO: Not sure what we want to check here?
func (as AuthorizationService) IsReady(ctx context.Context) error {
slog.DebugContext(ctx, "checking readiness of authorization service")
return nil
}

// abstracted into variable for mocking in tests
var retrieveAttributeDefinitions = func(ctx context.Context, ra *authorization.ResourceAttribute, sdk *otdf.SDK) (map[string]*attr.GetAttributeValuesByFqnsResponse_AttributeAndValue, error) {
attrFqns := ra.GetAttributeValueFqns()
Expand Down
50 changes: 36 additions & 14 deletions service/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/opentdf/platform/service/internal/logger"
"github.com/opentdf/platform/service/pkg/db"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

var (
serviceHealthChecks = make(map[string]func(context.Context) error)
)

type HealthService struct { //nolint:revive // HealthService is a valid name for this struct
healthpb.UnimplementedHealthServer
db *db.Client
logger *logger.Logger
}

Expand All @@ -25,15 +27,12 @@ func NewRegistration() serviceregistry.Registration {
ServiceDesc: &healthpb.Health_ServiceDesc,
RegisterFunc: func(srp serviceregistry.RegistrationParams) (any, serviceregistry.HandlerServer) {
err := srp.WellKnownConfig("health", map[string]any{
"endpoints": map[string]any{
"liveness": "/healthz?service=liveness",
"readiness": "/healthz?service=readiness",
},
"endpoint": "/healthz",
})
if err != nil {
panic(err)
srp.Logger.Error("failed to set well-known config", slog.String("error", err.Error()))
}
return &HealthService{db: srp.DBClient, logger: srp.Logger}, func(_ context.Context, _ *runtime.ServeMux, _ any) error {
return &HealthService{logger: srp.Logger}, func(_ context.Context, _ *runtime.ServeMux, _ any) error {
return nil
}
},
Expand All @@ -47,13 +46,27 @@ func (s HealthService) Check(ctx context.Context, req *healthpb.HealthCheckReque
}, nil
}

// Check to see if we are doing a readiness probe
if req.GetService() == "readiness" {
// Check the database connection
if err := s.db.Pgx.Ping(ctx); err != nil {
s.logger.Error("database connection is not ready", slog.String("error", err.Error()))
switch req.GetService() {
case "all":
for service, check := range serviceHealthChecks {
if err := check(ctx); err != nil {
s.logger.ErrorContext(ctx, "service is not ready", slog.String("service", service), slog.String("error", err.Error()))
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_NOT_SERVING,
}, nil
}
}
default:
if check, ok := serviceHealthChecks[req.GetService()]; ok {
if err := check(ctx); err != nil {
s.logger.ErrorContext(ctx, "service is not ready", slog.String("service", req.GetService()), slog.String("error", err.Error()))
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_NOT_SERVING,
}, nil
}
} else {
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_NOT_SERVING,
Status: healthpb.HealthCheckResponse_SERVICE_UNKNOWN,
}, nil
}
}
Expand All @@ -66,3 +79,12 @@ func (s HealthService) Check(ctx context.Context, req *healthpb.HealthCheckReque
func (s HealthService) Watch(_ *healthpb.HealthCheckRequest, _ healthpb.Health_WatchServer) error {
return status.Error(codes.Unimplemented, "unimplemented")
}

func RegisterReadinessCheck(namespace string, service func(context.Context) error) error {
if _, ok := serviceHealthChecks[namespace]; ok {
return status.Error(codes.AlreadyExists, "readiness check already registered")
}
serviceHealthChecks[namespace] = service

return nil
}
106 changes: 106 additions & 0 deletions service/health/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package health

import (
"context"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/health/grpc_health_v1"
)

type HealthCheckSuite struct {
suite.Suite
}

func (s *HealthCheckSuite) SetupSuite() {
}

func (s *HealthCheckSuite) TearDownTest() {
// Because its a global we need to reset it after each test
serviceHealthChecks = make(map[string]func(context.Context) error)
}

func (s *HealthCheckSuite) TestRegisterReadinessCheck() {
// TestRegisterReadinessCheck tests the registration of a health check.

// Register the health check.
err := RegisterReadinessCheck("service_1", func(context.Context) error {
return nil
})
s.Require().NoError(err)

// Check the health check.
err = serviceHealthChecks["service_1"](context.Background())
s.NoError(err)
}

func (s *HealthCheckSuite) TestRegisterHealthCheckAlreadyExists() {
// TestRegisterReadinessCheckAlreadyExists tests the registration of a health check that already exists.

// Register the health check.
err := RegisterReadinessCheck("service_2", func(context.Context) error {
return nil
})
s.Require().NoError(err)

// Check the health check.
err = RegisterReadinessCheck("service_2", func(context.Context) error {
return nil
})

s.Error(err)
}

func (s *HealthCheckSuite) TestCheck() {
// TestCheck tests the health check.
hs := &HealthService{}

// Register the health check.
err := RegisterReadinessCheck("success_3", func(context.Context) error {
return nil
})
s.Require().NoError(err)

err = RegisterReadinessCheck("success_4", func(context.Context) error {
return nil
})
s.Require().NoError(err)

// Check the health check.
result, err := hs.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
Service: "all",
})
s.Require().NoError(err)
s.Equal("SERVING", result.GetStatus().String())
}

func (s *HealthCheckSuite) TestCheckServiceUnknown() {
// TestCheckServiceUnknown tests the health check with an unknown service.
hs := &HealthService{}

// Check the health check.
result, err := hs.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
Service: "unknown",
})
s.Require().NoError(err)
s.Equal("SERVICE_UNKNOWN", result.GetStatus().String())
}

func (s *HealthCheckSuite) TestCheckNotServing() {
// TestCheckNotServing tests the health check when a service is not serving.
hs := &HealthService{}

// Register the health check.
err := RegisterReadinessCheck("failing", func(context.Context) error {
return assert.AnError
})

s.Require().NoError(err)

// Check the health check.
result, err := hs.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
Service: "failing",
})
s.Require().NoError(err)
s.Equal("NOT_SERVING", result.GetStatus().String())
}
8 changes: 8 additions & 0 deletions service/kas/access/provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package access

import (
"context"
"log/slog"
"net/url"

kaspb "github.com/opentdf/platform/protocol/go/kas"
Expand All @@ -20,3 +22,9 @@ type Provider struct {
AttributeSvc *url.URL
CryptoProvider security.CryptoProvider
}

// TODO: Not sure what we want to check here?
func (p Provider) IsReady(ctx context.Context) error {
slog.DebugContext(ctx, "checking readiness of kas service")
return nil
}
6 changes: 6 additions & 0 deletions service/kas/kas.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kas
import (
"context"
"fmt"
"log/slog"
"net/url"
"strings"

Expand Down Expand Up @@ -34,6 +35,11 @@ func NewRegistration() serviceregistry.Registration {
CryptoProvider: srp.OTDF.CryptoProvider,
SDK: srp.SDK,
}

if err := srp.RegisterReadinessCheck("kas", p.IsReady); err != nil {
slog.Error("failed to register kas readiness check", slog.String("error", err.Error()))
}

return &p, func(ctx context.Context, mux *runtime.ServeMux, server any) error {
kas, ok := server.(*access.Provider)
if !ok {
Expand Down
15 changes: 8 additions & 7 deletions service/pkg/server/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ func startService(ctx context.Context, cfg config.Config, s serviceregistry.Serv

// Create the service
impl, handler := s.RegisterFunc(serviceregistry.RegistrationParams{
Config: cfg.Services[s.Namespace],
OTDF: otdf,
DBClient: d,
Engine: eng,
SDK: client,
WellKnownConfig: wellknown.RegisterConfiguration,
Logger: logger.With("namespace", s.Namespace),
Config: cfg.Services[s.Namespace],
OTDF: otdf,
DBClient: d,
Engine: eng,
SDK: client,
WellKnownConfig: wellknown.RegisterConfiguration,
RegisterReadinessCheck: health.RegisterReadinessCheck,
Logger: logger.With("namespace", s.Namespace),
})

// Register the service with the gRPC server
Expand Down
15 changes: 8 additions & 7 deletions service/pkg/serviceregistry/serviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ type RemoteServiceConfig struct {
}

type RegistrationParams struct {
Config ServiceConfig
OTDF *server.OpenTDFServer
DBClient *db.Client
Engine *opa.Engine
SDK *sdk.SDK
WellKnownConfig func(namespace string, config any) error
Logger *logger.Logger
Config ServiceConfig
OTDF *server.OpenTDFServer
DBClient *db.Client
Engine *opa.Engine
SDK *sdk.SDK
WellKnownConfig func(namespace string, config any) error
RegisterReadinessCheck func(namespace string, check func(context.Context) error) error
Logger *logger.Logger
}
type HandlerServer func(ctx context.Context, mux *runtime.ServeMux, server any) error
type RegisterFunc func(RegistrationParams) (Impl any, HandlerServer HandlerServer)
Expand Down
19 changes: 18 additions & 1 deletion service/policy/namespaces/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ func NewRegistration() serviceregistry.Registration {
return serviceregistry.Registration{
ServiceDesc: &namespaces.NamespaceService_ServiceDesc,
RegisterFunc: func(srp serviceregistry.RegistrationParams) (any, serviceregistry.HandlerServer) {
return &NamespacesService{dbClient: policydb.NewClient(srp.DBClient), logger: srp.Logger}, func(ctx context.Context, mux *runtime.ServeMux, server any) error {
ns := &NamespacesService{dbClient: policydb.NewClient(srp.DBClient), logger: srp.Logger}

if err := srp.RegisterReadinessCheck("policy", ns.IsReady); err != nil {
slog.Error("failed to register policy readiness check", slog.String("error", err.Error()))
}

return ns, func(ctx context.Context, mux *runtime.ServeMux, server any) error {
nsServer, ok := server.(namespaces.NamespaceServiceServer)
if !ok {
return fmt.Errorf("failed to assert server as namespaces.NamespaceServiceServer")
Expand All @@ -34,6 +40,17 @@ func NewRegistration() serviceregistry.Registration {
}
}

// IsReady checks if the service is ready to serve requests.
// Without a database connection, the service is not ready.
func (ns NamespacesService) IsReady(ctx context.Context) error {
slog.DebugContext(ctx, "checking readiness of namespaces service")
if err := ns.dbClient.SQLDB.PingContext(ctx); err != nil {
return err
}

return nil
}

func (ns NamespacesService) ListNamespaces(ctx context.Context, req *namespaces.ListNamespacesRequest) (*namespaces.ListNamespacesResponse, error) {
state := policydb.GetDBStateTypeTransformedEnum(req.GetState())
ns.logger.Debug("listing namespaces", slog.String("state", state))
Expand Down

0 comments on commit 8a65161

Please sign in to comment.