Skip to content

Commit

Permalink
fix(xds): don't read metadata in ProxyBuilders (#5414)
Browse files Browse the repository at this point in the history
We were using a `DataplaneMetadataTracker` in all proxy builders.
This was causing issues as the proxy may disconnect while reconciliation is in progress.

We now set the proxy metadata in the DataplaneWatchdog so that it's read once only and nil checked.
We therefore protect ourselves against this race

Signed-off-by: Charly Molter <charly.molter@konghq.com>
  • Loading branch information
lahabana committed Dec 2, 2022
1 parent ff4463e commit 39ba902
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 70 deletions.
6 changes: 1 addition & 5 deletions pkg/api-server/inspect_endpoints.go
Expand Up @@ -24,7 +24,6 @@ import (
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/envoy/tags"
"github.com/kumahq/kuma/pkg/xds/server/callbacks"
"github.com/kumahq/kuma/pkg/xds/sync"
)

Expand All @@ -35,10 +34,7 @@ func getMatchedPolicies(
) (
*core_xds.MatchedPolicies, []gateway.GatewayListenerInfo, core_xds.Proxy, error,
) {
proxyBuilder := sync.DefaultDataplaneProxyBuilder(
*cfg,
callbacks.NewDataplaneMetadataTracker(),
envoy.APIV3)
proxyBuilder := sync.DefaultDataplaneProxyBuilder(*cfg, envoy.APIV3)
if proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext); err != nil {
return nil, nil, core_xds.Proxy{}, err
} else {
Expand Down
11 changes: 2 additions & 9 deletions pkg/plugins/runtime/gateway/suite_test.go
Expand Up @@ -96,17 +96,10 @@ func MakeProtoSnapshot(snap cache_v3.ResourceSnapshot) ProtoSnapshot {
}
}

type mockMetadataTracker struct{}

func (m mockMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata {
return nil
}

func MakeGeneratorContext(rt runtime.Runtime, key core_model.ResourceKey) (*xds_context.Context, *core_xds.Proxy) {
b := sync.DataplaneProxyBuilder{
MetadataTracker: mockMetadataTracker{},
Zone: rt.Config().Multizone.Zone.Name,
APIVersion: envoy.APIV3,
Zone: rt.Config().Multizone.Zone.Name,
APIVersion: envoy.APIV3,
}

cache, err := cla.NewCache(rt.Config().Store.Cache.ExpirationTime.Duration, rt.Metrics())
Expand Down
3 changes: 1 addition & 2 deletions pkg/xds/server/v3/snapshot_generator_test.go
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/kumahq/kuma/pkg/xds/generator"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"
"github.com/kumahq/kuma/pkg/xds/server"
"github.com/kumahq/kuma/pkg/xds/server/callbacks"
v3 "github.com/kumahq/kuma/pkg/xds/server/v3"
"github.com/kumahq/kuma/pkg/xds/sync"
"github.com/kumahq/kuma/pkg/xds/template"
Expand Down Expand Up @@ -118,7 +117,7 @@ var _ = Describe("GenerateSnapshot", func() {
cfg.DNSServer.ServiceVipPort,
)

proxyBuilder = sync.DefaultDataplaneProxyBuilder(cfg, callbacks.NewDataplaneMetadataTracker(), envoy_common.APIV3)
proxyBuilder = sync.DefaultDataplaneProxyBuilder(cfg, envoy_common.APIV3)
})

create := func(r core_model.Resource) {
Expand Down
13 changes: 2 additions & 11 deletions pkg/xds/sync/components.go
Expand Up @@ -18,26 +18,22 @@ var (

func DefaultDataplaneProxyBuilder(
config kuma_cp.Config,
metadataTracker DataplaneMetadataTracker,
apiVersion core_xds.APIVersion,
) *DataplaneProxyBuilder {
return &DataplaneProxyBuilder{
MetadataTracker: metadataTracker,
Zone: config.Multizone.Zone.Name,
APIVersion: apiVersion,
Zone: config.Multizone.Zone.Name,
APIVersion: apiVersion,
}
}

func DefaultIngressProxyBuilder(
rt core_runtime.Runtime,
metadataTracker DataplaneMetadataTracker,
apiVersion core_xds.APIVersion,
) *IngressProxyBuilder {
return &IngressProxyBuilder{
ResManager: rt.ResourceManager(),
ReadOnlyResManager: rt.ReadOnlyResourceManager(),
LookupIP: rt.LookupIP(),
MetadataTracker: metadataTracker,
apiVersion: apiVersion,
meshCache: rt.MeshCache(),
zone: rt.Config().Multizone.Zone.Name,
Expand All @@ -47,15 +43,13 @@ func DefaultIngressProxyBuilder(
func DefaultEgressProxyBuilder(
ctx context.Context,
rt core_runtime.Runtime,
metadataTracker DataplaneMetadataTracker,
apiVersion core_xds.APIVersion,
) *EgressProxyBuilder {
return &EgressProxyBuilder{
ctx: ctx,
ResManager: rt.ResourceManager(),
ReadOnlyResManager: rt.ReadOnlyResourceManager(),
LookupIP: rt.LookupIP(),
MetadataTracker: metadataTracker,
meshCache: rt.MeshCache(),
apiVersion: apiVersion,
zone: rt.Config().Multizone.Zone.Name,
Expand All @@ -77,20 +71,17 @@ func DefaultDataplaneWatchdogFactory(

dataplaneProxyBuilder := DefaultDataplaneProxyBuilder(
config,
metadataTracker,
apiVersion,
)

ingressProxyBuilder := DefaultIngressProxyBuilder(
rt,
metadataTracker,
apiVersion,
)

egressProxyBuilder := DefaultEgressProxyBuilder(
ctx,
rt,
metadataTracker,
apiVersion,
)

Expand Down
3 changes: 0 additions & 3 deletions pkg/xds/sync/dataplane_proxy_builder.go
Expand Up @@ -27,8 +27,6 @@ import (
var syncLog = core.Log.WithName("sync")

type DataplaneProxyBuilder struct {
MetadataTracker DataplaneMetadataTracker

Zone string
APIVersion core_xds.APIVersion
}
Expand Down Expand Up @@ -64,7 +62,6 @@ func (p *DataplaneProxyBuilder) Build(ctx context.Context, key core_model.Resour
Id: core_xds.FromResourceKey(key),
APIVersion: p.APIVersion,
Dataplane: dp,
Metadata: p.MetadataTracker.Metadata(key),
Routing: *routing,
Policies: *matchedPolicies,
SecretsTracker: secretsTracker,
Expand Down
16 changes: 9 additions & 7 deletions pkg/xds/sync/dataplane_watchdog.go
Expand Up @@ -63,11 +63,11 @@ func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
}
switch d.dpType {
case mesh_proto.DataplaneProxyType:
return d.syncDataplane(ctx)
return d.syncDataplane(ctx, metadata)
case mesh_proto.IngressProxyType:
return d.syncIngress(ctx)
return d.syncIngress(ctx, metadata)
case mesh_proto.EgressProxyType:
return d.syncEgress(ctx)
return d.syncEgress(ctx, metadata)
default:
// It might be a case that dp type is not yet inferred because there is no Dataplane definition yet.
return nil
Expand All @@ -91,7 +91,7 @@ func (d *DataplaneWatchdog) Cleanup() error {

// syncDataplane syncs state of the Dataplane.
// It uses Mesh Hash to decide if we need to regenerate configuration or not.
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
meshCtx, err := d.MeshCache.GetMeshContext(ctx, syncLog, d.key.Mesh)
if err != nil {
return err
Expand Down Expand Up @@ -126,6 +126,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
if err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy); err != nil {
return err
}
Expand All @@ -134,7 +135,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context) error {
}

// syncIngress synces state of Ingress Dataplane. Notice that it does not use Mesh Hash yet because Ingress supports many Meshes.
func (d *DataplaneWatchdog) syncIngress(ctx context.Context) error {
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
Expand All @@ -148,12 +149,13 @@ func (d *DataplaneWatchdog) syncIngress(ctx context.Context) error {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.IngressReconciler.Reconcile(*envoyCtx, proxy)
}

// syncEgress syncs state of Egress Dataplane. Notice that it does not use
// Mesh Hash yet because Egress supports many Meshes.
func (d *DataplaneWatchdog) syncEgress(ctx context.Context) error {
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneEgress does not have a mesh!
Expand All @@ -168,7 +170,7 @@ func (d *DataplaneWatchdog) syncEgress(ctx context.Context) error {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS

proxy.Metadata = metadata
return d.EgressReconciler.Reconcile(*envoyCtx, proxy)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/xds/sync/dataplane_watchdog_test.go
Expand Up @@ -87,9 +87,8 @@ var _ = Describe("Dataplane Watchdog", func() {

deps = sync.DataplaneWatchdogDependencies{
DataplaneProxyBuilder: &sync.DataplaneProxyBuilder{
MetadataTracker: metadataTracker,
APIVersion: envoy.APIV3,
Zone: zone,
APIVersion: envoy.APIV3,
Zone: zone,
},
DataplaneReconciler: snapshotReconciler,
EnvoyCpCtx: &xds_context.ControlPlaneContext{
Expand Down
18 changes: 8 additions & 10 deletions pkg/xds/sync/egress_proxy_builder.go
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
xds_topology "github.com/kumahq/kuma/pkg/xds/topology"
)
Expand All @@ -23,17 +23,16 @@ type EgressProxyBuilder struct {
ResManager manager.ResourceManager
ReadOnlyResManager manager.ReadOnlyResourceManager
LookupIP lookup.LookupIPFunc
MetadataTracker DataplaneMetadataTracker
meshCache *xds_cache.Cache

zone string
apiVersion xds.APIVersion
apiVersion core_xds.APIVersion
}

func (p *EgressProxyBuilder) Build(
ctx context.Context,
key core_model.ResourceKey,
) (*xds.Proxy, error) {
) (*core_xds.Proxy, error) {
zoneEgress := core_mesh.NewZoneEgressResource()

if err := p.ReadOnlyResManager.Get(
Expand Down Expand Up @@ -79,7 +78,7 @@ func (p *EgressProxyBuilder) Build(
return zoneIngresses[a].GetMeta().GetName() < zoneIngresses[b].GetMeta().GetName()
})

var meshResourcesList []*xds.MeshResources
var meshResourcesList []*core_xds.MeshResources

for _, mesh := range meshes {
meshName := mesh.GetMeta().GetName()
Expand All @@ -95,7 +94,7 @@ func (p *EgressProxyBuilder) Build(
faultInjections := meshCtx.Resources.FaultInjections().Items
rateLimits := meshCtx.Resources.RateLimits().Items

meshResources := &xds.MeshResources{
meshResources := &core_xds.MeshResources{
Mesh: mesh,
TrafficRoutes: trafficRoutes,
ExternalServices: externalServices,
Expand Down Expand Up @@ -124,15 +123,14 @@ func (p *EgressProxyBuilder) Build(
meshResourcesList = append(meshResourcesList, meshResources)
}

proxy := &xds.Proxy{
Id: xds.FromResourceKey(key),
proxy := &core_xds.Proxy{
Id: core_xds.FromResourceKey(key),
APIVersion: p.apiVersion,
ZoneEgressProxy: &xds.ZoneEgressProxy{
ZoneEgressProxy: &core_xds.ZoneEgressProxy{
ZoneEgressResource: zoneEgress,
ZoneIngresses: zoneIngresses,
MeshResourcesList: meshResourcesList,
},
Metadata: p.MetadataTracker.Metadata(key),
}

return proxy, nil
Expand Down
20 changes: 9 additions & 11 deletions pkg/xds/sync/ingress_proxy_builder.go
Expand Up @@ -10,7 +10,7 @@ import (
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
"github.com/kumahq/kuma/pkg/xds/ingress"
xds_topology "github.com/kumahq/kuma/pkg/xds/topology"
Expand All @@ -20,14 +20,13 @@ type IngressProxyBuilder struct {
ResManager manager.ResourceManager
ReadOnlyResManager manager.ReadOnlyResourceManager
LookupIP lookup.LookupIPFunc
MetadataTracker DataplaneMetadataTracker
meshCache *xds_cache.Cache

apiVersion xds.APIVersion
apiVersion core_xds.APIVersion
zone string
}

func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.ResourceKey) (*xds.Proxy, error) {
func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.ResourceKey) (*core_xds.Proxy, error) {
zoneIngress, err := p.getZoneIngress(ctx, key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -61,18 +60,17 @@ func (p *IngressProxyBuilder) Build(ctx context.Context, key core_model.Resource

routing := p.resolveRouting(zoneIngress, zoneEgressesList, allMeshDataplanes, availableExternalServices, zoneIngressProxy.MeshGateways)

proxy := &xds.Proxy{
Id: xds.FromResourceKey(key),
proxy := &core_xds.Proxy{
Id: core_xds.FromResourceKey(key),
APIVersion: p.apiVersion,
ZoneIngress: zoneIngress,
Metadata: p.MetadataTracker.Metadata(key),
Routing: *routing,
ZoneIngressProxy: zoneIngressProxy,
}
return proxy, nil
}

func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*xds.ZoneIngressProxy, error) {
func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*core_xds.ZoneIngressProxy, error) {
routes := &core_mesh.TrafficRouteResourceList{}
if err := p.ReadOnlyResManager.List(ctx, routes); err != nil {
return nil, err
Expand All @@ -90,7 +88,7 @@ func (p *IngressProxyBuilder) buildZoneIngressProxy(ctx context.Context) (*xds.Z
return nil, err
}

return &xds.ZoneIngressProxy{
return &core_xds.ZoneIngressProxy{
TrafficRouteList: routes,
GatewayRoutes: gatewayRoutes,
MeshGateways: gateways,
Expand All @@ -117,13 +115,13 @@ func (p *IngressProxyBuilder) resolveRouting(
dataplanes *core_mesh.DataplaneResourceList,
externalServices *core_mesh.ExternalServiceResourceList,
meshGateways *core_mesh.MeshGatewayResourceList,
) *xds.Routing {
) *core_xds.Routing {
destinations := ingress.BuildDestinationMap(zoneIngress)
endpoints := ingress.BuildEndpointMap(
destinations, dataplanes.Items, externalServices.Items, zoneEgresses.Items, meshGateways.Items,
)

routing := &xds.Routing{
routing := &core_xds.Routing{
OutboundTargets: endpoints,
}
return routing
Expand Down
9 changes: 0 additions & 9 deletions pkg/xds/sync/proxy_builder_test.go
Expand Up @@ -30,12 +30,6 @@ import (
"github.com/kumahq/kuma/pkg/xds/sync"
)

type mockMetadataTracker struct{}

func (m mockMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata {
return nil
}

func initializeStore(ctx context.Context, resourceManager core_manager.ResourceManager, fileWithResourcesName string) {
resourcePath := filepath.Join(
"testdata", "input", fileWithResourcesName,
Expand Down Expand Up @@ -75,7 +69,6 @@ func initializeStore(ctx context.Context, resourceManager core_manager.ResourceM
}

var _ = Describe("Proxy Builder", func() {
tracker := mockMetadataTracker{}
localZone := "zone-1"

ctx := context.Background()
Expand Down Expand Up @@ -112,7 +105,6 @@ var _ = Describe("Proxy Builder", func() {
egressProxyBuilder := sync.DefaultEgressProxyBuilder(
ctx,
rt,
tracker,
envoy_common.APIV3,
)

Expand Down Expand Up @@ -195,7 +187,6 @@ var _ = Describe("Proxy Builder", func() {
Describe("Build() zone ingress", func() {
ingressProxyBuilder := sync.DefaultIngressProxyBuilder(
rt,
tracker,
envoy_common.APIV3,
)

Expand Down

0 comments on commit 39ba902

Please sign in to comment.