From 37231ebdab99d60581c3e0e612e6ffa9585eaf50 Mon Sep 17 00:00:00 2001 From: Benjamin Bengfort Date: Fri, 10 Feb 2023 14:10:35 -0600 Subject: [PATCH] gRPC Liveness Probe (#200) --- pkg/ensign/server.go | 12 + pkg/utils/probez/generate.go | 3 + pkg/utils/probez/grpc/v1/health.pb.go | 293 +++++++++++++++++++++ pkg/utils/probez/grpc/v1/health_grpc.pb.go | 169 ++++++++++++ pkg/utils/probez/grpc/v1/probez.go | 171 ++++++++++++ pkg/utils/probez/grpc/v1/probez_test.go | 234 ++++++++++++++++ proto/grpc/health/v1/health.proto | 27 ++ 7 files changed, 909 insertions(+) create mode 100644 pkg/utils/probez/generate.go create mode 100644 pkg/utils/probez/grpc/v1/health.pb.go create mode 100644 pkg/utils/probez/grpc/v1/health_grpc.pb.go create mode 100644 pkg/utils/probez/grpc/v1/probez.go create mode 100644 pkg/utils/probez/grpc/v1/probez_test.go create mode 100644 proto/grpc/health/v1/health.proto diff --git a/pkg/ensign/server.go b/pkg/ensign/server.go index 060b388df..4056013d7 100644 --- a/pkg/ensign/server.go +++ b/pkg/ensign/server.go @@ -16,6 +16,7 @@ import ( "github.com/rotationalio/ensign/pkg/ensign/interceptors" "github.com/rotationalio/ensign/pkg/ensign/o11y" "github.com/rotationalio/ensign/pkg/utils/logger" + health "github.com/rotationalio/ensign/pkg/utils/probez/grpc/v1" "github.com/rotationalio/ensign/pkg/utils/sentry" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -41,7 +42,9 @@ func init() { // An Ensign server implements the Ensign service as defined by the wire protocol. type Server struct { + health.ProbeServer api.UnimplementedEnsignServer + srv *grpc.Server // The gRPC server that handles incoming requests in individual go routines conf config.Config // Primary source of truth for server configuration started time.Time // The timestamp that the server was started (for uptime) @@ -104,6 +107,9 @@ func (s *Server) Serve() (err error) { s.echan <- s.Shutdown() }() + // Set the server to a not serving state + s.NotHealthy() + // Run monitoring and metrics server if err = o11y.Serve(s.conf.Monitoring); err != nil { log.Error().Err(err).Msg("could not start monitoring server") @@ -129,6 +135,9 @@ func (s *Server) Serve() (err error) { // Now that the server is running set the start time to track uptime s.started = time.Now() + // Set the server to ready and serving requests + s.Healthy() + // Listen for any fatal errors on the error channel, blocking while the server go // routine does its work. If the error is nil we expect a graceful shutdown. if err = <-s.echan; err != nil { @@ -151,6 +160,9 @@ func (s *Server) Run(sock net.Listener) { // return a multierror if there were multiple problems during shutdown but it will // attempt to close all open services and processes. func (s *Server) Shutdown() (err error) { + // Set the server to a not serving state + s.NotHealthy() + errs := make([]error, 0) log.Info().Msg("gracefully shutting down ensign server") s.srv.GracefulStop() diff --git a/pkg/utils/probez/generate.go b/pkg/utils/probez/generate.go new file mode 100644 index 000000000..9d4aaa900 --- /dev/null +++ b/pkg/utils/probez/generate.go @@ -0,0 +1,3 @@ +package probez + +//go:generate protoc -I=$GOPATH/src/github.com/rotationalio/ensign/proto --go_out=. --go_opt=module=github.com/rotationalio/ensign/pkg/utils/probez --go-grpc_out=. --go-grpc_opt=module=github.com/rotationalio/ensign/pkg/utils/probez grpc/health/v1/health.proto diff --git a/pkg/utils/probez/grpc/v1/health.pb.go b/pkg/utils/probez/grpc/v1/health.pb.go new file mode 100644 index 000000000..6611f6074 --- /dev/null +++ b/pkg/utils/probez/grpc/v1/health.pb.go @@ -0,0 +1,293 @@ +// Enables grpc liveness probes for k8s v1.24 or higher. +// From: https://github.com/grpc/grpc/blob/master/doc/health-checking.md +// NOTE: at the time of this implementation there is no readiness probe. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.9 +// source: grpc/health/v1/health.proto + +package health + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 + HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 // Used only by the Watch method. +) + +// Enum value maps for HealthCheckResponse_ServingStatus. +var ( + HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", + 3: "SERVICE_UNKNOWN", + } + HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, + "SERVICE_UNKNOWN": 3, + } +) + +func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus { + p := new(HealthCheckResponse_ServingStatus) + *p = x + return p +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor { + return file_grpc_health_v1_health_proto_enumTypes[0].Descriptor() +} + +func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType { + return &file_grpc_health_v1_health_proto_enumTypes[0] +} + +func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead. +func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) { + return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{1, 0} +} + +type HealthCheckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` +} + +func (x *HealthCheckRequest) Reset() { + *x = HealthCheckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_health_v1_health_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HealthCheckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckRequest) ProtoMessage() {} + +func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { + mi := &file_grpc_health_v1_health_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { + return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{0} +} + +func (x *HealthCheckRequest) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +type HealthCheckResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (x *HealthCheckResponse) Reset() { + *x = HealthCheckResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_health_v1_health_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HealthCheckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckResponse) ProtoMessage() {} + +func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { + mi := &file_grpc_health_v1_health_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead. +func (*HealthCheckResponse) Descriptor() ([]byte, []int) { + return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{1} +} + +func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus { + if x != nil { + return x.Status + } + return HealthCheckResponse_UNKNOWN +} + +var File_grpc_health_v1_health_proto protoreflect.FileDescriptor + +var file_grpc_health_v1_health_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x76, 0x31, + 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x67, + 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x22, 0x2e, 0x0a, + 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xb1, 0x01, + 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x22, 0x4f, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, + 0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, + 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, + 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, + 0x03, 0x32, 0xae, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, + 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, + 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x30, 0x01, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x69, 0x6f, 0x2f, 0x65, 0x6e, + 0x73, 0x69, 0x67, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x75, 0x74, 0x69, 0x6c, 0x73, 0x2f, 0x70, + 0x72, 0x6f, 0x62, 0x65, 0x7a, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x76, 0x31, 0x3b, 0x68, 0x65, + 0x61, 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_grpc_health_v1_health_proto_rawDescOnce sync.Once + file_grpc_health_v1_health_proto_rawDescData = file_grpc_health_v1_health_proto_rawDesc +) + +func file_grpc_health_v1_health_proto_rawDescGZIP() []byte { + file_grpc_health_v1_health_proto_rawDescOnce.Do(func() { + file_grpc_health_v1_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_health_v1_health_proto_rawDescData) + }) + return file_grpc_health_v1_health_proto_rawDescData +} + +var file_grpc_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_grpc_health_v1_health_proto_goTypes = []interface{}{ + (HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus + (*HealthCheckRequest)(nil), // 1: grpc.health.v1.HealthCheckRequest + (*HealthCheckResponse)(nil), // 2: grpc.health.v1.HealthCheckResponse +} +var file_grpc_health_v1_health_proto_depIdxs = []int32{ + 0, // 0: grpc.health.v1.HealthCheckResponse.status:type_name -> grpc.health.v1.HealthCheckResponse.ServingStatus + 1, // 1: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest + 1, // 2: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest + 2, // 3: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse + 2, // 4: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_grpc_health_v1_health_proto_init() } +func file_grpc_health_v1_health_proto_init() { + if File_grpc_health_v1_health_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_grpc_health_v1_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HealthCheckRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_health_v1_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HealthCheckResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_grpc_health_v1_health_proto_rawDesc, + NumEnums: 1, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_grpc_health_v1_health_proto_goTypes, + DependencyIndexes: file_grpc_health_v1_health_proto_depIdxs, + EnumInfos: file_grpc_health_v1_health_proto_enumTypes, + MessageInfos: file_grpc_health_v1_health_proto_msgTypes, + }.Build() + File_grpc_health_v1_health_proto = out.File + file_grpc_health_v1_health_proto_rawDesc = nil + file_grpc_health_v1_health_proto_goTypes = nil + file_grpc_health_v1_health_proto_depIdxs = nil +} diff --git a/pkg/utils/probez/grpc/v1/health_grpc.pb.go b/pkg/utils/probez/grpc/v1/health_grpc.pb.go new file mode 100644 index 000000000..7df6f7f7e --- /dev/null +++ b/pkg/utils/probez/grpc/v1/health_grpc.pb.go @@ -0,0 +1,169 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.9 +// source: grpc/health/v1/health.proto + +package health + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// HealthClient is the client API for Health service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HealthClient interface { + Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) + Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) +} + +type healthClient struct { + cc grpc.ClientConnInterface +} + +func NewHealthClient(cc grpc.ClientConnInterface) HealthClient { + return &healthClient{cc} +} + +func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := c.cc.Invoke(ctx, "/grpc.health.v1.Health/Check", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) { + stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], "/grpc.health.v1.Health/Watch", opts...) + if err != nil { + return nil, err + } + x := &healthWatchClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Health_WatchClient interface { + Recv() (*HealthCheckResponse, error) + grpc.ClientStream +} + +type healthWatchClient struct { + grpc.ClientStream +} + +func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) { + m := new(HealthCheckResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// HealthServer is the server API for Health service. +// All implementations must embed UnimplementedHealthServer +// for forward compatibility +type HealthServer interface { + Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) + Watch(*HealthCheckRequest, Health_WatchServer) error + mustEmbedUnimplementedHealthServer() +} + +// UnimplementedHealthServer must be embedded to have forward compatible implementations. +type UnimplementedHealthServer struct { +} + +func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") +} +func (UnimplementedHealthServer) Watch(*HealthCheckRequest, Health_WatchServer) error { + return status.Errorf(codes.Unimplemented, "method Watch not implemented") +} +func (UnimplementedHealthServer) mustEmbedUnimplementedHealthServer() {} + +// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HealthServer will +// result in compilation errors. +type UnsafeHealthServer interface { + mustEmbedUnimplementedHealthServer() +} + +func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) { + s.RegisterService(&Health_ServiceDesc, srv) +} + +func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HealthCheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthServer).Check(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.health.v1.Health/Check", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(HealthCheckRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(HealthServer).Watch(m, &healthWatchServer{stream}) +} + +type Health_WatchServer interface { + Send(*HealthCheckResponse) error + grpc.ServerStream +} + +type healthWatchServer struct { + grpc.ServerStream +} + +func (x *healthWatchServer) Send(m *HealthCheckResponse) error { + return x.ServerStream.SendMsg(m) +} + +// Health_ServiceDesc is the grpc.ServiceDesc for Health service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Health_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.health.v1.Health", + HandlerType: (*HealthServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Check", + Handler: _Health_Check_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Watch", + Handler: _Health_Watch_Handler, + ServerStreams: true, + }, + }, + Metadata: "grpc/health/v1/health.proto", +} diff --git a/pkg/utils/probez/grpc/v1/probez.go b/pkg/utils/probez/grpc/v1/probez.go new file mode 100644 index 000000000..a7e6a0cfa --- /dev/null +++ b/pkg/utils/probez/grpc/v1/probez.go @@ -0,0 +1,171 @@ +package health + +import ( + context "context" + "errors" + "io" + "sync" + + "github.com/oklog/ulid/v2" + "github.com/rs/zerolog/log" + "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +const ( + DefaultService = "default" + StatusServing = HealthCheckResponse_SERVING + StatusNotServing = HealthCheckResponse_NOT_SERVING +) + +// ProbeServer implements the grpc.health.v1.Health service to provide status +// information on different services that the gRPC server may be running. The intended +// use case is to embed the ProbeServer into a gRPC server implementation and to set the +// status of the probes services using the state method. +type ProbeServer struct { + sync.RWMutex + UnimplementedHealthServer + services map[string]HealthCheckResponse_ServingStatus + watchers map[string]map[string]chan<- HealthCheckResponse_ServingStatus +} + +// Healthy sets the probe server default service as serving. +func (h *ProbeServer) Healthy() { + h.SetStatus(DefaultService, StatusServing) + log.Debug().Bool("healthy", true).Msg("server is healthy") +} + +// NotHealthy sets the probe server default service to not serving. +func (h *ProbeServer) NotHealthy() { + h.SetStatus(DefaultService, StatusNotServing) + log.Debug().Bool("healthy", false).Msg("server is not healthy") +} + +// Sets the status of the specified service, notifying all watchers about the change in +// status. Callers should not set the status to Unknown or Service Unknown. If an empty +// string is sepcified as the service, the default service is used instead. +func (h *ProbeServer) SetStatus(service string, status HealthCheckResponse_ServingStatus) { + if service == "" { + service = DefaultService + } + + h.Lock() + defer h.Unlock() + // Ensure the service and watchers map has been created + h.checkstate(service) + + // Set the service state for future lookups + h.services[service] = status + + // Notify the watchers that the status has changed + for _, watcher := range h.watchers[service] { + watcher <- status + } +} + +// Status returns the status of the specified service. If empty string is provided then +// the "default service" is used to lookup the status. +func (h *ProbeServer) Status(service string, stream bool) (status HealthCheckResponse_ServingStatus) { + var ok bool + if service == "" { + service = DefaultService + } + + h.RLock() + defer h.RUnlock() + if status, ok = h.services[service]; !ok { + if stream { + return HealthCheckResponse_SERVICE_UNKNOWN + } + return HealthCheckResponse_UNKNOWN + } + return status +} + +// Add a watcher with a unique ID to listen for status changes for the specified service. +// A channel is returned that will broadcast status changes for that service. +func (h *ProbeServer) AddWatcher(id, service string) <-chan HealthCheckResponse_ServingStatus { + watcher := make(chan HealthCheckResponse_ServingStatus, 1) + if service == "" { + service = DefaultService + } + + h.Lock() + defer h.Unlock() + // Ensure the service and watchers map has been created + h.checkstate(service) + + h.watchers[service][id] = watcher + log.Trace().Str("service", service).Str("watcher", id).Msg("probe watcher added") + return watcher +} + +// Remove a watcher with the specified unique id and stop listening for status changes. +func (h *ProbeServer) DelWatcher(id string) { + h.Lock() + defer h.Unlock() + for sname, service := range h.watchers { + if watcher, ok := service[id]; ok { + close(watcher) + delete(service, id) + log.Trace().Str("service", sname).Str("watcher", id).Msg("probe watcher deleted") + } + } +} + +// Check implements the Health service interface and is a Unary response to a health +// check request for the specified service. If the specified service is empty the status +// of the default service is returned. +func (h *ProbeServer) Check(ctx context.Context, in *HealthCheckRequest) (out *HealthCheckResponse, err error) { + out = &HealthCheckResponse{ + Status: h.Status(in.Service, false), + } + return out, nil +} + +// Watch implements the Health service interface and provides server-side streaming +// updates of when the status of a specific service changes. If the specified service is +// empty then the status of the default service is returned. +func (h *ProbeServer) Watch(in *HealthCheckRequest, stream Health_WatchServer) (err error) { + id := ulid.Make().String() + watcher := h.AddWatcher(id, in.Service) + ctx := stream.Context() + + // Send the first health check message + if err = stream.Send(&HealthCheckResponse{Status: h.Status(in.Service, true)}); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return status.Error(codes.Aborted, err.Error()) + } + + // Wait for updates from the watcher and send those to the client + for { + select { + case <-ctx.Done(): + return status.Error(codes.DeadlineExceeded, ctx.Err().Error()) + case serviceStatus := <-watcher: + if err = stream.Send(&HealthCheckResponse{Status: serviceStatus}); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return status.Error(codes.Aborted, err.Error()) + } + } + } +} + +// check state ensures that the maps have been initialized for the specified service. +func (h *ProbeServer) checkstate(service string) { + if h.services == nil { + h.services = make(map[string]HealthCheckResponse_ServingStatus) + } + + if h.watchers == nil { + h.watchers = make(map[string]map[string]chan<- HealthCheckResponse_ServingStatus) + } + + if _, ok := h.watchers[service]; !ok { + h.watchers[service] = make(map[string]chan<- HealthCheckResponse_ServingStatus) + } +} diff --git a/pkg/utils/probez/grpc/v1/probez_test.go b/pkg/utils/probez/grpc/v1/probez_test.go new file mode 100644 index 000000000..a6f56ecfa --- /dev/null +++ b/pkg/utils/probez/grpc/v1/probez_test.go @@ -0,0 +1,234 @@ +package health_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/rotationalio/ensign/pkg/utils/bufconn" + . "github.com/rotationalio/ensign/pkg/utils/probez/grpc/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestProbezDefaultStatus(t *testing.T) { + srv := &ProbeServer{} + + // Status before default service has been set + require.Equal(t, HealthCheckResponse_UNKNOWN, srv.Status("", false)) + require.Equal(t, HealthCheckResponse_SERVICE_UNKNOWN, srv.Status("", true)) + require.Equal(t, srv.Status(DefaultService, false), srv.Status("", false)) + require.Equal(t, srv.Status(DefaultService, true), srv.Status("", true)) + + // Mark default service as healthy + srv.Healthy() + require.Equal(t, StatusServing, srv.Status("", false)) + require.Equal(t, StatusServing, srv.Status("", true)) + require.Equal(t, srv.Status(DefaultService, false), srv.Status("", false)) + require.Equal(t, srv.Status(DefaultService, true), srv.Status("", true)) + + // Mark default service as not healthy + srv.NotHealthy() + require.Equal(t, StatusNotServing, srv.Status("", false)) + require.Equal(t, StatusNotServing, srv.Status("", true)) + require.Equal(t, srv.Status(DefaultService, false), srv.Status("", false)) + require.Equal(t, srv.Status(DefaultService, true), srv.Status("", true)) +} + +func TestProbezServiceStatus(t *testing.T) { + srv := &ProbeServer{} + services := []string{ + "", "foo", "bar", "health", + } + + for i, service := range services { + // Status should be unknown before it has been set + require.Equal(t, HealthCheckResponse_UNKNOWN, srv.Status(service, false)) + require.Equal(t, HealthCheckResponse_SERVICE_UNKNOWN, srv.Status(service, true)) + + srv.SetStatus(service, StatusNotServing) + require.Equal(t, StatusNotServing, srv.Status(service, false)) + require.Equal(t, StatusNotServing, srv.Status(service, true)) + + // The other service statuses should not have changed + for j, otherService := range services { + if i == j { + continue + } + + if j < i { + require.Equal(t, StatusServing, srv.Status(otherService, false)) + require.Equal(t, StatusServing, srv.Status(otherService, true)) + } else { + require.Equal(t, HealthCheckResponse_UNKNOWN, srv.Status(otherService, false)) + require.Equal(t, HealthCheckResponse_SERVICE_UNKNOWN, srv.Status(otherService, true)) + } + } + + srv.SetStatus(service, StatusServing) + require.Equal(t, StatusServing, srv.Status(service, false)) + require.Equal(t, StatusServing, srv.Status(service, true)) + + require.Equal(t, srv.Status(DefaultService, false), srv.Status("", false)) + require.Equal(t, srv.Status(DefaultService, true), srv.Status("", true)) + } +} + +func TestWatchers(t *testing.T) { + var wg sync.WaitGroup + var watchersReady sync.WaitGroup + counts := make([]int, 5) + srv := &ProbeServer{} + + // Spin 5 watchers, 3 for default, 2 for ensign + for i := 0; i < 5; i++ { + wg.Add(1) + watchersReady.Add(1) + go func(idx int) { + defer wg.Done() + service := DefaultService + if idx > 2 { + service = "ensign.v1.Ensign" + } + + id := fmt.Sprintf("watcher-%d", idx) + watcher := srv.AddWatcher(id, service) + watchersReady.Done() + + for range watcher { + counts[idx]++ + } + }(i) + } + + // Wait until all the watchers are ready before changing statuses. + watchersReady.Wait() + + // Spin several go routines that change the status of the service and delete the + // watcher when they are done. + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + service := DefaultService + if idx > 2 { + service = "ensign.v1.Ensign" + } + + for i := 0; i < idx+1; i++ { + if i%2 == 0 { + srv.SetStatus(service, StatusNotServing) + } else { + srv.SetStatus(service, StatusServing) + } + } + + // Cleanup the watchers after time has passed + time.Sleep(15 * time.Millisecond) + srv.DelWatcher(fmt.Sprintf("watcher-%d", idx)) + }(i) + } + + wg.Wait() + require.Equal(t, []int{6, 6, 6, 9, 9}, counts) +} + +func TestServer(t *testing.T) { + // Create a bufconn grpc server + bufnet := bufconn.New() + probe := &ProbeServer{} + srv := grpc.NewServer() + + RegisterHealthServer(srv, probe) + go srv.Serve(bufnet.Sock()) + defer func() { + srv.GracefulStop() + bufnet.Close() + }() + + cc, err := bufnet.Connect(context.Background(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err, "could not connect to probe server") + + var wg sync.WaitGroup + var ready sync.WaitGroup + client := NewHealthClient(cc) + counts := make([]int, 3) + + // Create some streaming watchers + for i := 0; i < 3; i++ { + wg.Add(1) + ready.Add(1) + go func(i int) { + defer wg.Done() + var service string + switch i { + case 0: + service = "" + case 1: + service = DefaultService + case 2: + service = "ensign.v1.Ensign" + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stream, err := client.Watch(ctx, &HealthCheckRequest{Service: service}) + assert.NoError(t, err, "could not connect stream client") + ready.Done() + + for { + _, err := stream.Recv() + assert.NoError(t, err, "could not recv message %d in thread %d", counts[i]+1, i) + + counts[i]++ + if counts[i] > 3 { + err = stream.CloseSend() + assert.NoError(t, err, "could not close send message") + return + } + } + }(i) + } + + ready.Wait() + + // Unknown service states + for _, service := range []string{"", DefaultService, "ensign.v1.Ensign"} { + rep, err := client.Check(context.Background(), &HealthCheckRequest{Service: service}) + require.NoError(t, err) + require.Equal(t, HealthCheckResponse_UNKNOWN, rep.Status) + } + + // Not Serving Service States + probe.NotHealthy() + probe.SetStatus("ensign.v1.Ensign", StatusNotServing) + + for _, service := range []string{"", DefaultService, "ensign.v1.Ensign"} { + rep, err := client.Check(context.Background(), &HealthCheckRequest{Service: service}) + require.NoError(t, err) + require.Equal(t, StatusNotServing, rep.Status) + } + + // Serving Service States + probe.Healthy() + probe.SetStatus("ensign.v1.Ensign", StatusServing) + + for _, service := range []string{"", DefaultService, "ensign.v1.Ensign"} { + rep, err := client.Check(context.Background(), &HealthCheckRequest{Service: service}) + require.NoError(t, err) + require.Equal(t, StatusServing, rep.Status) + } + + probe.NotHealthy() + probe.SetStatus("ensign.v1.Ensign", StatusNotServing) + + // Check watchers + wg.Wait() + require.Equal(t, []int{4, 4, 4}, counts) +} diff --git a/proto/grpc/health/v1/health.proto b/proto/grpc/health/v1/health.proto new file mode 100644 index 000000000..d8016ad73 --- /dev/null +++ b/proto/grpc/health/v1/health.proto @@ -0,0 +1,27 @@ +// Enables grpc liveness probes for k8s v1.24 or higher. +// From: https://github.com/grpc/grpc/blob/master/doc/health-checking.md +// NOTE: at the time of this implementation there is no readiness probe. +syntax = "proto3"; + +package grpc.health.v1; +option go_package = "github.com/rotationalio/ensign/pkg/utils/probez/grpc/v1;health"; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} \ No newline at end of file