Skip to content

Commit

Permalink
core/config: add config version, additional telemetry (#4645)
Browse files Browse the repository at this point in the history
* core/config: add config version, additional telemetry

* typo
  • Loading branch information
calebdoxsey committed Oct 27, 2023
1 parent dd7e3b9 commit ae420f0
Show file tree
Hide file tree
Showing 13 changed files with 853 additions and 762 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -27,6 +27,7 @@ type Config struct {
Options *Options
AutoCertificates []tls.Certificate
EnvoyVersion string
Version int64

// DerivedCertificates are TLS certificates derived from the shared secret
DerivedCertificates []tls.Certificate
Expand Down Expand Up @@ -62,6 +63,7 @@ func (cfg *Config) Clone() *Config {
_ = copy(endpoints, cfg.MetricsScrapeEndpoints)

return &Config{
Version: cfg.Version,
Options: newOptions,
AutoCertificates: cfg.AutoCertificates,
EnvoyVersion: cfg.EnvoyVersion,
Expand Down
4 changes: 4 additions & 0 deletions config/config_source.go
Expand Up @@ -114,6 +114,7 @@ func NewFileOrEnvironmentSource(
cfg := &Config{
Options: options,
EnvoyVersion: envoyVersion,
Version: 1,
}

ports, err := netutil.AllocatePorts(6)
Expand Down Expand Up @@ -151,6 +152,7 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
options, err := newOptionsFromConfig(src.configFile)
if err == nil {
cfg = cfg.Clone()
cfg.Version++
cfg.Options = options
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true)
} else {
Expand All @@ -160,6 +162,8 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
src.config = cfg
src.mu.Unlock()

log.Info(ctx).Int64("config-version", cfg.Version).Msg("config: loaded configuration")

src.Trigger(ctx, cfg)
}

Expand Down
7 changes: 7 additions & 0 deletions config/envoyconfig/bootstrap.go
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)

var (
Expand All @@ -32,6 +33,9 @@ func (b *Builder) BuildBootstrap(
cfg *config.Config,
fullyStatic bool,
) (bootstrap *envoy_config_bootstrap_v3.Bootstrap, err error) {
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildBootstrap")
defer span.End()

bootstrap = new(envoy_config_bootstrap_v3.Bootstrap)

bootstrap.Admin, err = b.BuildBootstrapAdmin(cfg)
Expand Down Expand Up @@ -164,6 +168,9 @@ func (b *Builder) BuildBootstrapStaticResources(
cfg *config.Config,
fullyStatic bool,
) (staticResources *envoy_config_bootstrap_v3.Bootstrap_StaticResources, err error) {
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildBootstrapStaticResources")
defer span.End()

staticResources = new(envoy_config_bootstrap_v3.Bootstrap_StaticResources)

if fullyStatic {
Expand Down
4 changes: 4 additions & 0 deletions config/envoyconfig/clusters.go
Expand Up @@ -20,11 +20,15 @@ import (

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/urlutil"
)

// BuildClusters builds envoy clusters from the given config.
func (b *Builder) BuildClusters(ctx context.Context, cfg *config.Config) ([]*envoy_config_cluster_v3.Cluster, error) {
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildClusters")
defer span.End()

grpcURLs := []*url.URL{{
Scheme: "http",
Host: b.localGRPCAddress,
Expand Down
4 changes: 4 additions & 0 deletions config/envoyconfig/listeners.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/sets"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/urlutil"
)

Expand All @@ -48,6 +49,9 @@ func (b *Builder) BuildListeners(
cfg *config.Config,
fullyStatic bool,
) ([]*envoy_config_listener_v3.Listener, error) {
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildListeners")
defer span.End()

var listeners []*envoy_config_listener_v3.Listener

if config.IsAuthenticate(cfg.Options.Services) || config.IsProxy(cfg.Options.Services) {
Expand Down
4 changes: 4 additions & 0 deletions config/envoyconfig/route_configurations.go
Expand Up @@ -6,13 +6,17 @@ import (
envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)

// BuildRouteConfigurations builds the route configurations for the RDS service.
func (b *Builder) BuildRouteConfigurations(
ctx context.Context,
cfg *config.Config,
) ([]*envoy_config_route_v3.RouteConfiguration, error) {
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildRouteConfigurations")
defer span.End()

var routeConfigurations []*envoy_config_route_v3.RouteConfiguration

if config.IsAuthenticate(cfg.Options.Services) || config.IsProxy(cfg.Options.Services) {
Expand Down
2 changes: 1 addition & 1 deletion internal/controlplane/server.go
Expand Up @@ -267,7 +267,7 @@ func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error
if err != nil {
return err
}
srv.xdsmgr.Update(ctx, res)
srv.xdsmgr.Update(ctx, cfg.Version, res)
return nil
}

Expand Down
21 changes: 20 additions & 1 deletion internal/controlplane/xds.go
Expand Up @@ -6,6 +6,8 @@ import (

envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/protoutil"
)
Expand All @@ -17,14 +19,22 @@ const (
)

func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*envoy_service_discovery_v3.Resource, error) {
resources := map[string][]*envoy_service_discovery_v3.Resource{}
ctx, span := trace.StartSpan(ctx, "controlplane.Server.buildDiscoveryResources")
defer span.End()

cfg := srv.currentConfig.Load()

log.Info(ctx).Int64("config-version", cfg.Version).Msg("controlplane: building discovery resources")

resources := map[string][]*envoy_service_discovery_v3.Resource{}
var clusterCount, listenerCount, routeConfigurationCount int

clusters, err := srv.Builder.BuildClusters(ctx, cfg.Config)
if err != nil {
return nil, err
}
for _, cluster := range clusters {
clusterCount++
resources[clusterTypeURL] = append(resources[clusterTypeURL], &envoy_service_discovery_v3.Resource{
Name: cluster.Name,
Version: hex.EncodeToString(cryptutil.HashProto(cluster)),
Expand All @@ -37,6 +47,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
return nil, err
}
for _, listener := range listeners {
listenerCount++
resources[listenerTypeURL] = append(resources[listenerTypeURL], &envoy_service_discovery_v3.Resource{
Name: listener.Name,
Version: hex.EncodeToString(cryptutil.HashProto(listener)),
Expand All @@ -49,12 +60,20 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
return nil, err
}
for _, routeConfiguration := range routeConfigurations {
routeConfigurationCount++
resources[routeConfigurationTypeURL] = append(resources[routeConfigurationTypeURL], &envoy_service_discovery_v3.Resource{
Name: routeConfiguration.Name,
Version: hex.EncodeToString(cryptutil.HashProto(routeConfiguration)),
Resource: protoutil.NewAny(routeConfiguration),
})
}

log.Info(ctx).
Int64("config-version", cfg.Version).
Int("cluster-count", clusterCount).
Int("listener-count", listenerCount).
Int("route-configuration-count", routeConfigurationCount).
Msg("controlplane: built discovery resources")

return resources, nil
}
37 changes: 34 additions & 3 deletions internal/controlplane/xdsmgr/xdsmgr.go
Expand Up @@ -3,6 +3,9 @@ package xdsmgr

import (
"context"
"fmt"
"strconv"
"strings"
"sync"

envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand All @@ -11,6 +14,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/signal"
)

Expand All @@ -36,7 +40,7 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma
return &Manager{
signal: signal.New(),

nonce: uuid.NewString(),
nonce: toNonce(0),
resources: resources,
}
}
Expand Down Expand Up @@ -106,13 +110,20 @@ func (mgr *Manager) DeltaAggregatedResources(
case req.GetResponseNonce() == "":
// neither an ACK or a NACK
case req.GetErrorDetail() != nil:
log.Info(ctx).
Any("error-detail", req.GetErrorDetail()).
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
Msg("xdsmgr: nack")
// a NACK
// - set the client resource versions to the current resource versions
state.clientResourceVersions = make(map[string]string)
for _, resource := range mgr.resources[req.GetTypeUrl()] {
state.clientResourceVersions[resource.Name] = resource.Version
}
case req.GetResponseNonce() == mgr.nonce:
log.Info(ctx).
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
Msg("xdsmgr: ack")
// an ACK for the last response
// - set the client resource versions to the current resource versions
state.clientResourceVersions = make(map[string]string)
Expand All @@ -121,6 +132,9 @@ func (mgr *Manager) DeltaAggregatedResources(
}
default:
// an ACK for a response that's not the last response
log.Info(ctx).
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
Msg("xdsmgr: ack")
}

// update subscriptions
Expand Down Expand Up @@ -200,6 +214,11 @@ func (mgr *Manager) DeltaAggregatedResources(
case <-ctx.Done():
return ctx.Err()
case res := <-outgoing:
log.Info(ctx).
Int64("config-version", versionFromNonce(res.GetNonce())).
Int("resource-count", len(res.GetResources())).
Int("removed-resource-count", len(res.GetRemovedResources())).
Msg("xdsmgr: sending resources")
err := stream.Send(res)
if err != nil {
return err
Expand All @@ -219,8 +238,8 @@ func (mgr *Manager) StreamAggregatedResources(

// Update updates the state of resources. If any changes are made they will be pushed to any listening
// streams. For each TypeURL the list of resources should be the complete list of resources.
func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_service_discovery_v3.Resource) {
nonce := uuid.New().String()
func (mgr *Manager) Update(ctx context.Context, version int64, resources map[string][]*envoy_service_discovery_v3.Resource) {
nonce := toNonce(version)

mgr.mu.Lock()
mgr.nonce = nonce
Expand All @@ -229,3 +248,15 @@ func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_se

mgr.signal.Broadcast(ctx)
}

func toNonce(version int64) string {
return fmt.Sprintf("%d/%s", version, uuid.New().String())
}

// versionFromNonce parses the version out of the nonce. A missing or invalid version will be returned as 0.
func versionFromNonce(nonce string) (version int64) {
if idx := strings.Index(nonce, "/"); idx > 0 {
version, _ = strconv.ParseInt(nonce[:idx], 10, 64)
}
return version
}
4 changes: 2 additions & 2 deletions internal/controlplane/xdsmgr/xdsmgr_test.go
Expand Up @@ -94,7 +94,7 @@ func TestManager(t *testing.T) {
}, msg.GetResources())
ack(msg.Nonce)

mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{
typeURL: {{Name: "r1", Version: "2"}},
})

Expand All @@ -105,7 +105,7 @@ func TestManager(t *testing.T) {
}, msg.GetResources())
ack(msg.Nonce)

mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{
typeURL: nil,
})

Expand Down
7 changes: 7 additions & 0 deletions internal/databroker/config_source.go
Expand Up @@ -74,6 +74,8 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
_, span := trace.StartSpan(ctx, "databroker.config_source.rebuild")
defer span.End()

log.Info(ctx).Msg("databroker: rebuilding configuration")

src.mu.Lock()
defer src.mu.Unlock()

Expand Down Expand Up @@ -107,6 +109,9 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
// add all the config policies to the list
for _, id := range ids {
cfgpb := src.dbConfigs[id]
if cfgpb.GetVersion() > 0 {
cfg.Version = cfgpb.GetVersion()
}

cfg.Options.ApplySettings(ctx, certsIndex, cfgpb.Settings)
var errCount uint64
Expand Down Expand Up @@ -166,6 +171,8 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
// add the additional policies here since calling `Validate` will reset them.
cfg.Options.AdditionalPolicies = append(cfg.Options.AdditionalPolicies, additionalPolicies...)

log.Info(ctx).Int64("config-version", cfg.Version).Msg("databroker: built new config")

src.computedConfig = cfg
if !firstTime {
src.Trigger(ctx, cfg)
Expand Down

0 comments on commit ae420f0

Please sign in to comment.