Skip to content

Commit

Permalink
feat: add node address filters, filter out k8s addresses for Talos API
Browse files Browse the repository at this point in the history
This implements abstract `NodeAddressFilter` which can be attached to
build additional `NodeAddress` resources filtering out some entries from
the complete list.

Kubernetes creates two filters to get all node IPs without Kubernetes
CIDRs and vice versa, only Kubernetes CIDRs.

API certificate generation now considers all addresses minus K8s CIDRs.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Aug 27, 2021
1 parent caee24b commit 6956edd
Show file tree
Hide file tree
Showing 12 changed files with 627 additions and 30 deletions.
129 changes: 129 additions & 0 deletions internal/app/machined/pkg/controllers/config/k8s_address_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package config

import (
"context"
"fmt"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
"inet.af/netaddr"

"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/network"
)

// K8sAddressFilterController creates NodeAddressFilters based on machine configuration.
type K8sAddressFilterController struct{}

// Name implements controller.Controller interface.
func (ctrl *K8sAddressFilterController) Name() string {
return "network.K8sAddressFilterController"
}

// Inputs implements controller.Controller interface.
func (ctrl *K8sAddressFilterController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineConfigType,
ID: pointer.ToString(config.V1Alpha1ID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *K8sAddressFilterController) Outputs() []controller.Output {
return []controller.Output{
{
Type: network.NodeAddressFilterType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *K8sAddressFilterController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineConfigType, config.V1Alpha1ID, resource.VersionUndefined))
if err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error getting config: %w", err)
}

touchedIDs := make(map[resource.ID]struct{})

if cfg != nil {
cfgProvider := cfg.(*config.MachineConfig).Config()

var podCIDR, serviceCIDR netaddr.IPPrefix

podCIDR, err = netaddr.ParseIPPrefix(cfgProvider.Cluster().Network().PodCIDR())
if err != nil {
return fmt.Errorf("error parsing podCIDR: %w", err)
}

serviceCIDR, err = netaddr.ParseIPPrefix(cfgProvider.Cluster().Network().ServiceCIDR())
if err != nil {
return fmt.Errorf("error parsing serviceCIDR: %w", err)
}

if err = r.Modify(ctx, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterNoK8s), func(r resource.Resource) error {
spec := r.(*network.NodeAddressFilter).TypedSpec()

spec.ExcludeSubnets = []netaddr.IPPrefix{podCIDR, serviceCIDR}

return nil
}); err != nil {
return fmt.Errorf("error updating output resource: %w", err)
}

touchedIDs[k8s.NodeAddressFilterNoK8s] = struct{}{}

if err = r.Modify(ctx, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterOnlyK8s), func(r resource.Resource) error {
spec := r.(*network.NodeAddressFilter).TypedSpec()

spec.IncludeSubnets = []netaddr.IPPrefix{podCIDR, serviceCIDR}

return nil
}); err != nil {
return fmt.Errorf("error updating output resource: %w", err)
}

touchedIDs[k8s.NodeAddressFilterOnlyK8s] = struct{}{}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(network.NamespaceName, network.NodeAddressFilterType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//nolint:dupl
package config_test

import (
"context"
"fmt"
"log"
"net/url"
"sync"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

configctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/config"
"github.com/talos-systems/talos/pkg/logging"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/network"
)

type K8sAddressFilterSuite struct {
suite.Suite

state state.State

runtime *runtime.Runtime
wg sync.WaitGroup

ctx context.Context
ctxCancel context.CancelFunc
}

func (suite *K8sAddressFilterSuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)

suite.state = state.WrapCore(namespaced.NewState(inmem.Build))

var err error

suite.runtime, err = runtime.NewRuntime(suite.state, logging.Wrap(log.Writer()))
suite.Require().NoError(err)

suite.Require().NoError(suite.runtime.RegisterController(&configctrl.K8sAddressFilterController{}))

suite.startRuntime()
}

func (suite *K8sAddressFilterSuite) startRuntime() {
suite.wg.Add(1)

go func() {
defer suite.wg.Done()

suite.Assert().NoError(suite.runtime.Run(suite.ctx))
}()
}

func (suite *K8sAddressFilterSuite) assertResource(md resource.Metadata, check func(res resource.Resource) error) func() error {
return func() error {
r, err := suite.state.Get(suite.ctx, md)
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
}

return err
}

return check(r)
}
}

func (suite *K8sAddressFilterSuite) TestReconcile() {
u, err := url.Parse("https://foo:6443")
suite.Require().NoError(err)

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{},
ClusterConfig: &v1alpha1.ClusterConfig{
ControlPlane: &v1alpha1.ControlPlaneConfig{
Endpoint: &v1alpha1.Endpoint{
URL: u,
},
},
ClusterNetwork: &v1alpha1.ClusterNetworkConfig{
ServiceSubnet: []string{
"10.96.0.0/12",
},
PodSubnet: []string{
"10.244.0.0/12",
},
},
},
})
suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
resource.NewMetadata(network.NamespaceName, network.NodeAddressFilterType, k8s.NodeAddressFilterOnlyK8s, resource.VersionUndefined),
func(res resource.Resource) error {
spec := res.(*network.NodeAddressFilter).TypedSpec()

suite.Assert().Equal("[10.244.0.0/12 10.96.0.0/12]", fmt.Sprintf("%s", spec.IncludeSubnets))
suite.Assert().Empty(spec.ExcludeSubnets)

return nil
},
),
))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
resource.NewMetadata(network.NamespaceName, network.NodeAddressFilterType, k8s.NodeAddressFilterNoK8s, resource.VersionUndefined),
func(res resource.Resource) error {
spec := res.(*network.NodeAddressFilter).TypedSpec()

suite.Assert().Empty(spec.IncludeSubnets)
suite.Assert().Equal("[10.244.0.0/12 10.96.0.0/12]", fmt.Sprintf("%s", spec.ExcludeSubnets))

return nil
},
),
))
}

func (suite *K8sAddressFilterSuite) TearDownTest() {
suite.T().Log("tear down")

suite.ctxCancel()

suite.wg.Wait()
}

func TestK8sAddressFilterSuite(t *testing.T) {
suite.Run(t, new(K8sAddressFilterSuite))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//nolint:dupl
package config_test

import (
Expand Down

0 comments on commit 6956edd

Please sign in to comment.