Skip to content

Commit

Permalink
Add gRPC health-check hashicorp#3073
Browse files Browse the repository at this point in the history
  • Loading branch information
koiuo committed Jan 30, 2018
1 parent 842ae5a commit 6e8b36b
Show file tree
Hide file tree
Showing 122 changed files with 30,342 additions and 19 deletions.
74 changes: 61 additions & 13 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type Agent struct {
// checkTCPs maps the check ID to an associated TCP check
checkTCPs map[types.CheckID]*checks.CheckTCP

// checkGRPCs maps the check ID to an associated GRPC check
checkGRPCs map[types.CheckID]*checks.CheckGRPC

// checkTTLs maps the check ID to an associated check TTL
checkTTLs map[types.CheckID]*checks.CheckTTL

Expand Down Expand Up @@ -212,6 +215,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
checkTTLs: make(map[types.CheckID]*checks.CheckTTL),
checkHTTPs: make(map[types.CheckID]*checks.CheckHTTP),
checkTCPs: make(map[types.CheckID]*checks.CheckTCP),
checkGRPCs: make(map[types.CheckID]*checks.CheckGRPC),
checkDockers: make(map[types.CheckID]*checks.CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
Expand Down Expand Up @@ -1170,6 +1174,9 @@ func (a *Agent) ShutdownAgent() error {
for _, chk := range a.checkTCPs {
chk.Stop()
}
for _, chk := range a.checkGRPCs {
chk.Stop()
}
for _, chk := range a.checkDockers {
chk.Stop()
}
Expand Down Expand Up @@ -1706,19 +1713,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
chkType.Interval = checks.MinInterval
}

// We re-use the API client's TLS structure since it
// closely aligns with Consul's internal configuration.
tlsConfig := &api.TLSConfig{
InsecureSkipVerify: chkType.TLSSkipVerify,
}
if a.config.EnableAgentTLSForChecks {
tlsConfig.Address = a.config.ServerName
tlsConfig.KeyFile = a.config.KeyFile
tlsConfig.CertFile = a.config.CertFile
tlsConfig.CAFile = a.config.CAFile
tlsConfig.CAPath = a.config.CAPath
}
tlsClientConfig, err := api.SetupTLSConfig(tlsConfig)
tlsClientConfig, err := a.setupTLSClientConfig(chkType.TLSSkipVerify)
if err != nil {
return fmt.Errorf("Failed to set up TLS: %v", err)
}
Expand Down Expand Up @@ -1759,6 +1754,38 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tcp.Start()
a.checkTCPs[check.CheckID] = tcp

case chkType.IsGRPC():
if existing, ok := a.checkGRPCs[check.CheckID]; ok {
existing.Stop()
delete(a.checkGRPCs, check.CheckID)
}
if chkType.Interval < checks.MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
check.CheckID, checks.MinInterval))
chkType.Interval = checks.MinInterval
}

var tlsClientConfig *tls.Config
if chkType.TLS {
var err error
tlsClientConfig, err = a.setupTLSClientConfig(chkType.TLSSkipVerify)
if err != nil {
return fmt.Errorf("Failed to set up TLS: %v", err)
}
}

grpc := &checks.CheckGRPC{
Notify: a.State,
CheckID: check.CheckID,
GRPC: chkType.GRPC,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
TLSClientConfig: tlsClientConfig,
}
grpc.Start()
a.checkGRPCs[check.CheckID] = grpc

case chkType.IsDocker():
if existing, ok := a.checkDockers[check.CheckID]; ok {
existing.Stop()
Expand Down Expand Up @@ -1862,6 +1889,23 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
return nil
}

func (a *Agent) setupTLSClientConfig(skipVerify bool) (tlsClientConfig *tls.Config, err error) {
// We re-use the API client's TLS structure since it
// closely aligns with Consul's internal configuration.
tlsConfig := &api.TLSConfig{
InsecureSkipVerify: skipVerify,
}
if a.config.EnableAgentTLSForChecks {
tlsConfig.Address = a.config.ServerName
tlsConfig.KeyFile = a.config.KeyFile
tlsConfig.CertFile = a.config.CertFile
tlsConfig.CAFile = a.config.CAFile
tlsConfig.CAPath = a.config.CAPath
}
tlsClientConfig, err = api.SetupTLSConfig(tlsConfig)
return
}

// RemoveCheck is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
Expand Down Expand Up @@ -1905,6 +1949,10 @@ func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
check.Stop()
delete(a.checkTCPs, checkID)
}
if check, ok := a.checkGRPCs[checkID]; ok {
check.Stop()
delete(a.checkGRPCs, checkID)
}
if check, ok := a.checkTTLs[checkID]; ok {
check.Stop()
delete(a.checkTTLs, checkID)
Expand Down
37 changes: 37 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,43 @@ func TestAgent_AddCheck_ExecDisable(t *testing.T) {
}
}

func TestAgent_AddCheck_GRPC(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()

health := &structs.HealthCheck{
Node: "foo",
CheckID: "grpchealth",
Name: "grpc health checking protocol",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
GRPC: "localhost:12345/package.Service",
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "")
if err != nil {
t.Fatalf("err: %v", err)
}

// Ensure we have a check mapping
sChk, ok := a.State.Checks()["grpchealth"]
if !ok {
t.Fatalf("missing grpchealth check")
}

// Ensure our check is in the right state
if sChk.Status != api.HealthCritical {
t.Fatalf("check not critical")
}

// Ensure a check is setup
if _, ok := a.checkGRPCs["grpchealth"]; !ok {
t.Fatalf("missing grpchealth check")
}
}

func TestAgent_RemoveCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `
Expand Down
68 changes: 68 additions & 0 deletions agent/checks/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,71 @@ func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
return api.HealthCritical, buf, nil
}
}

// CheckGRPC is used to periodically send request to a gRPC server
// application that implements gRPC health-checking protocol.
// The check is passing if returned status is SERVING.
// The check is critical if connection fails or returned status is
// not SERVING.
type CheckGRPC struct {
Notify CheckNotifier
CheckID types.CheckID
GRPC string
Interval time.Duration
Timeout time.Duration
TLSClientConfig *tls.Config
Logger *log.Logger

probe *GrpcHealthProbe
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}

func (c *CheckGRPC) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
timeout := 10 * time.Second
if c.Timeout > 0 {
timeout = c.Timeout
}
c.probe = NewGrpcHealthProbe(c.GRPC, timeout, c.TLSClientConfig)
c.stop = false
c.stopCh = make(chan struct{})
go c.run()
}

func (c *CheckGRPC) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}

func (c *CheckGRPC) check() {
err := c.probe.Check()
if err != nil {
c.Logger.Printf("[DEBUG] Check %q failed: %s", c.CheckID, err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
} else {
c.Logger.Printf("[DEBUG] Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", c.GRPC))
}
}

func (c *CheckGRPC) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
75 changes: 75 additions & 0 deletions agent/checks/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package checks

import (
"context"
"crypto/tls"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
hv1 "google.golang.org/grpc/health/grpc_health_v1"
"strings"
"time"
)

var ErrGRPCUnhealthy = fmt.Errorf("gRPC application didn't report service healthy")

// GrpcHealthProbe connects to gRPC application and queries health service for application/service status.
type GrpcHealthProbe struct {
server string
request *hv1.HealthCheckRequest
timeout time.Duration
dialOptions []grpc.DialOption
}

// NewGrpcHealthProbe constructs GrpcHealthProbe from target string in format
// server[/service]
// If service is omitted, health of the entire application is probed
func NewGrpcHealthProbe(target string, timeout time.Duration, tlsConfig *tls.Config) *GrpcHealthProbe {
serverAndService := strings.SplitN(target, "/", 2)

server := serverAndService[0]
request := hv1.HealthCheckRequest{}
if len(serverAndService) > 1 {
request.Service = serverAndService[1]
}

var dialOptions = []grpc.DialOption{}

if tlsConfig != nil {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOptions = append(dialOptions, grpc.WithInsecure())
}

return &GrpcHealthProbe{
server: server,
request: &request,
timeout: timeout,
dialOptions: dialOptions,
}
}

// Check if the target of this GrpcHealthProbe is healthy
// If nil is returned, target is healthy, otherwise target is not healthy
func (probe *GrpcHealthProbe) Check() (err error) {
ctx, cancel := context.WithTimeout(context.Background(), probe.timeout)
defer cancel()

connection, err := grpc.DialContext(ctx, probe.server, probe.dialOptions...)
if err != nil {
return
}
defer connection.Close()

client := hv1.NewHealthClient(connection)
response, err := client.Check(ctx, probe.request)
if err != nil {
return
}
if response == nil || (response != nil && response.Status != hv1.HealthCheckResponse_SERVING) {
return ErrGRPCUnhealthy
}

return nil
}

Loading

0 comments on commit 6e8b36b

Please sign in to comment.