Skip to content

Commit

Permalink
health-checks: zero route availability improvements (#5111)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasaga committed May 17, 2024
1 parent adb5f78 commit 8269a72
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 50 deletions.
62 changes: 27 additions & 35 deletions internal/zero/healthcheck/check_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package healthcheck

import (
"context"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -14,25 +15,31 @@ import (
"golang.org/x/exp/maps"

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/urlutil"
"github.com/pomerium/pomerium/pkg/cryptutil"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/health"
"github.com/pomerium/pomerium/pkg/protoutil"
clusterping "github.com/pomerium/pomerium/pkg/zero/ping"
)

// CheckRoutes checks whether all routes that are referenced by this pomerium instance configuration are reachable
// it resolves the DNS entry and tries to access a pomerium jwks route
// we should hit ourselves and observe the same public key that we have in our configuration
// otherwise, something is misconfigured on the DNS level
func (c *checker) CheckRoutes(ctx context.Context) {
err := checkRoutesReachable(ctx, c.bootstrap.GetConfig(), c.databrokerClient)
func (c *checker) CheckRoutes(ctx context.Context) error {
key, err := getClusterPublicKey(c.bootstrap.GetConfig())
if err != nil {
log.Warn(ctx).Err(err).Msg("routes reachability check failed")
health.ReportInternalError(health.RoutesReachable, err)
return err
}

err = checkRoutesReachable(ctx, key, c.GetConfigs())
if err == nil {
health.ReportOK(health.RoutesReachable)
} else if ctx.Err() == nil {
health.ReportError(health.RoutesReachable, err)
}
return err
}

const (
Expand All @@ -43,6 +50,9 @@ func getPingHTTPClient() *http.Client {
return &http.Client{
Timeout: connectionTimeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return (&net.Dialer{
Timeout: connectionTimeout,
Expand All @@ -54,15 +64,10 @@ func getPingHTTPClient() *http.Client {

func checkRoutesReachable(
ctx context.Context,
cfg *config.Config,
databrokerClient databroker.DataBrokerServiceClient,
key *jose.JSONWebKey,
configs []*configpb.Config,
) error {
key, err := getClusterPublicKey(cfg)
if err != nil {
return fmt.Errorf("error getting cluster public key: %w", err)
}

hosts, err := getRouteHosts(ctx, databrokerClient)
hosts, err := getHosts(configs)
if err != nil {
return fmt.Errorf("error getting route hosts: %w", err)
}
Expand All @@ -77,13 +82,7 @@ func checkRoutesReachable(
}
}

if len(errs) == 0 {
health.ReportOK(health.RoutesReachable)
} else {
health.ReportError(health.RoutesReachable, errors.Join(errs...))
}

return nil
return errors.Join(errs...)
}

func getClusterPublicKey(cfg *config.Config) (*jose.JSONWebKey, error) {
Expand All @@ -100,21 +99,9 @@ func getClusterPublicKey(cfg *config.Config) (*jose.JSONWebKey, error) {
return key, nil
}

func getRouteHosts(ctx context.Context, databrokerClient databroker.DataBrokerServiceClient) ([]string, error) {
records, _, _, err := databroker.InitialSync(ctx, databrokerClient, &databroker.SyncLatestRequest{
Type: protoutil.GetTypeURL(new(configpb.Config)),
})
if err != nil {
return nil, fmt.Errorf("error during initial sync: %w", err)
}

func getHosts(configs []*configpb.Config) ([]string, error) {
hosts := make(map[string]struct{})
for _, record := range records {
var cfg configpb.Config
if err := record.Data.UnmarshalTo(&cfg); err != nil {
return nil, fmt.Errorf("error unmarshalling config: %w", err)
}

for _, cfg := range configs {
for _, route := range cfg.GetRoutes() {
if route.GetTlsCustomCa() != "" {
continue
Expand All @@ -123,6 +110,11 @@ func getRouteHosts(ctx context.Context, databrokerClient databroker.DataBrokerSe
if err != nil {
continue
}
if u.Scheme != "https" {
// there's a complication with TCP+HTTPS routes as in general we may not know the host address for them
// and we can't rely on the config's server address port part, as it may be different from actual externally reachable port
continue
}
hosts[u.Host] = struct{}{}
}
}
Expand Down
35 changes: 20 additions & 15 deletions internal/zero/healthcheck/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package healthcheck

import (
"context"
"time"
"fmt"
"sync/atomic"

"golang.org/x/sync/errgroup"

"github.com/pomerium/pomerium/config"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)

Expand All @@ -16,28 +20,29 @@ func RunChecks(
c := &checker{
bootstrap: bootstrap,
databrokerClient: databrokerClient,
forceCheck: make(chan struct{}, 1),
}
return c.run(ctx)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { c.Scheduler(ctx); return nil })
eg.Go(func() error { return c.ConfigSyncer(ctx) })
return eg.Wait()
}

type checker struct {
bootstrap config.Source
databrokerClient databroker.DataBrokerServiceClient
forceCheck chan struct{}
configs atomic.Value
}

const runHealthChecksInterval = time.Minute * 30

func (c *checker) run(ctx context.Context) error {
tm := time.NewTimer(runHealthChecksInterval)
defer tm.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-tm.C:
c.CheckRoutes(ctx)
tm.Reset(runHealthChecksInterval)
func getConfig(records []*databroker.Record) ([]*configpb.Config, error) {
var cfgs []*configpb.Config
for _, record := range records {
cfg := new(configpb.Config)
if err := record.Data.UnmarshalTo(cfg); err != nil {
return nil, fmt.Errorf("error unmarshalling config: %w", err)
}
cfgs = append(cfgs, cfg)
}
return cfgs, nil
}
54 changes: 54 additions & 0 deletions internal/zero/healthcheck/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package healthcheck

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
)

const (
runHealthChecksMaxInterval = time.Minute * 30
runHealthCheckMinInterval = time.Minute
)

func (c *checker) Scheduler(ctx context.Context) {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
bo.MaxInterval = runHealthChecksMaxInterval
bo.InitialInterval = runHealthCheckMinInterval
bo.Reset()

tm := time.NewTimer(runHealthCheckMinInterval)
defer tm.Stop()

select {
case <-ctx.Done():
return
case <-tm.C:
}

for {
select {
case <-ctx.Done():
return
case <-c.forceCheck:
case <-tm.C:
}

next := runHealthChecksMaxInterval
err := c.CheckRoutes(ctx)
if err != nil {
next = bo.NextBackOff()
} else {
bo.Reset()
}
if !tm.Stop() {
select {
case <-tm.C:
default:
}
}
tm.Reset(next)
}
}
51 changes: 51 additions & 0 deletions internal/zero/healthcheck/syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package healthcheck

import (
"context"

configpb "github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/health"
"github.com/pomerium/pomerium/pkg/protoutil"
)

func (c *checker) ConfigSyncer(ctx context.Context) error {
syncer := databroker.NewSyncer("zero-health-check", c, databroker.WithTypeURL(protoutil.GetTypeURL(new(configpb.Config))))
return syncer.Run(ctx)
}

func (c *checker) GetConfigs() []*configpb.Config {
configs := c.configs.Load()
if configs == nil {
return nil
}
return configs.([]*configpb.Config)
}

// ClearRecords implements databroker.Syncer interface
func (c *checker) ClearRecords(_ context.Context) {
c.configs.Store([]*configpb.Config{})
}

// UpdateRecords implements databroker.Syncer interface
func (c *checker) UpdateRecords(_ context.Context, _ uint64, records []*databroker.Record) {
if len(records) == 0 {
return
}

cfgs, err := getConfig(records)
if err != nil {
health.ReportInternalError(health.RoutesReachable, err)
return
}
c.configs.Store(cfgs)
select {
case c.forceCheck <- struct{}{}:
default:
}
}

// GetDataBrokerServiceClient implements databroker.Syncer interface
func (c *checker) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return c.databrokerClient
}
16 changes: 16 additions & 0 deletions pkg/health/provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package health

import (
"errors"
"sync"
)

Expand All @@ -15,6 +16,14 @@ func StrAttr(key, value string) Attr {
return Attr{Key: key, Value: value}
}

// InternalErrorKey is the key used to indicate that a check failed due to some non-user facing error
const InternalErrorKey = "internal_error"

// ErrorAttr creates a new error attribute, that is used to indicate that a check failed due to some non-user facing error
func ErrorAttr(err error) Attr {
return Attr{Key: InternalErrorKey, Value: err.Error()}
}

// ReportOK reports that a check was successful
func ReportOK(check Check, attributes ...Attr) {
p := defaultProvider.Load()
Expand All @@ -23,6 +32,13 @@ func ReportOK(check Check, attributes ...Attr) {
}
}

var ErrInternalError = errors.New("internal error")

// ReportInternalError reports that a check failed due to an internal error
func ReportInternalError(check Check, err error, attributes ...Attr) {
ReportError(check, ErrInternalError, append([]Attr{ErrorAttr(err)}, attributes...)...)
}

// ReportError reports that a check failed
func ReportError(check Check, err error, attributes ...Attr) {
p := defaultProvider.Load()
Expand Down

0 comments on commit 8269a72

Please sign in to comment.