Skip to content

Commit

Permalink
fix: add a KubeSpan option to disable extra endpoint harvesting
Browse files Browse the repository at this point in the history
It works well for small clusters, but with bigger clusters it puts too
much load on the discovery service, as it has quadratic complexity in
number of endpoints discovered/reported from each member.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Dec 12, 2023
1 parent 4547ad9 commit 131a1b1
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 89 deletions.
1 change: 1 addition & 0 deletions api/resource/definitions/kubespan/kubespan.proto
Expand Up @@ -17,6 +17,7 @@ message ConfigSpec {
bool advertise_kubernetes_networks = 5;
uint32 mtu = 6;
repeated string endpoint_filters = 7;
bool harvest_extra_endpoints = 8;
}

// EndpointSpec describes Endpoint state.
Expand Down
1 change: 1 addition & 0 deletions internal/app/machined/pkg/controllers/kubespan/config.go
Expand Up @@ -48,6 +48,7 @@ func NewConfigController() *ConfigController {
res.TypedSpec().SharedSecret = c.Cluster().Secret()
res.TypedSpec().ForceRouting = c.Machine().Network().KubeSpan().ForceRouting()
res.TypedSpec().AdvertiseKubernetesNetworks = c.Machine().Network().KubeSpan().AdvertiseKubernetesNetworks()
res.TypedSpec().HarvestExtraEndpoints = c.Machine().Network().KubeSpan().HarvestExtraEndpoints()
res.TypedSpec().MTU = c.Machine().Network().KubeSpan().MTU()
res.TypedSpec().EndpointFilters = c.Machine().Network().KubeSpan().Filters().Endpoints()
}
Expand Down
54 changes: 28 additions & 26 deletions internal/app/machined/pkg/controllers/kubespan/endpoint.go
Expand Up @@ -9,12 +9,14 @@ import (
"fmt"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/gen/value"
"go.uber.org/zap"

"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)

Expand All @@ -29,6 +31,12 @@ func (ctrl *EndpointController) Name() string {
// Inputs implements controller.Controller interface.
func (ctrl *EndpointController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: kubespan.ConfigType,
ID: optional.Some(kubespan.ConfigID),
Kind: controller.InputWeak,
},
{
Namespace: cluster.NamespaceName,
Type: cluster.AffiliateType,
Expand Down Expand Up @@ -63,6 +71,23 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
case <-r.EventCh():
}

cfg, err := safe.ReaderGetByID[*kubespan.Config](ctx, r, kubespan.ConfigID)
if err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error getting kubespan configuration: %w", err)
}

r.StartTrackingOutputs()

if cfg == nil || !cfg.TypedSpec().HarvestExtraEndpoints {
// not enabled, short-circuit early
if err = safe.CleanupOutputs[*kubespan.Endpoint](ctx, r); err != nil {
return err
}

continue
}

// for every kubespan peer, if it's up and has endpoint, harvest that endpoint
peerStatuses, err := safe.ReaderListAll[*kubespan.PeerStatus](ctx, r)
if err != nil {
return fmt.Errorf("error listing cluster affiliates: %w", err)
Expand All @@ -84,9 +109,6 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
}
}

// for every kubespan peer, if it's up and has endpoint, harvest that endpoint
touchedIDs := make(map[resource.ID]struct{})

for it := peerStatuses.Iterator(); it.Next(); {
res := it.Value()
peerStatus := res.TypedSpec()
Expand Down Expand Up @@ -114,30 +136,10 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
}); err != nil {
return err
}

touchedIDs[res.Metadata().ID()] = struct{}{}
}

// list keys for cleanup
list, err := safe.ReaderListAll[*kubespan.Endpoint](ctx, r)
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
if err = safe.CleanupOutputs[*kubespan.Endpoint](ctx, r); err != nil {
return err
}

for it := list.Iterator(); it.Next(); {
res := it.Value()

if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}

r.ResetRestartBackoff()
}
}
Expand Up @@ -15,6 +15,7 @@ import (
kubespanctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/kubespan"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)

Expand All @@ -27,6 +28,10 @@ func (suite *EndpointSuite) TestReconcile() {

suite.startRuntime()

cfg := kubespan.NewConfig(config.NamespaceName, kubespan.ConfigID)
cfg.TypedSpec().HarvestExtraEndpoints = true
suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

// create some affiliates and peer statuses
affiliate1 := cluster.NewAffiliate(cluster.NamespaceName, "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC")
*affiliate1.TypedSpec() = cluster.AffiliateSpec{
Expand Down
138 changes: 75 additions & 63 deletions pkg/machinery/api/resource/definitions/kubespan/kubespan.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 131a1b1

Please sign in to comment.