Skip to content

Commit

Permalink
fix: handle cases when merged resource re-appears before being destroyed
Browse files Browse the repository at this point in the history
The sequence of events to reproduce the problem:

* some resource was merged as final representation with ID `x`
* underlying source resource gets destroyed
* merge controller marks final resource `x` for teardown and waits
for the finalizers to be empty
* another source resource appears which gets merged to same final `x`
* as `x` is in the teardown phase, spec controller will ignore it
* merge controller doesn't see the problem as well, as `x` spec is
correct, but the phase is wrong (which merge controller ignores)

This pulls in COSI fix to return an error if a resource in teardown
phase is modified. This way merge controller knows that the resource `x`
is in the teardown phase, so it should be first fully torn down, and
then new representation should be re-created as new resource with same ID
`x`.

Regression unit-tests included (they don't reproduce the sequence of
events always reliably, but they do with 10% probability).

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Jun 28, 2021
1 parent 1e9a0e7 commit 5f6ec3e
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 12 deletions.
12 changes: 11 additions & 1 deletion internal/app/machined/pkg/controllers/network/address_merge.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/resources/network"
Expand Down Expand Up @@ -95,7 +96,16 @@ func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtim

return nil
}); err != nil {
return fmt.Errorf("error updating resource: %w", err)
if state.IsPhaseConflictError(err) {
logger.Debug("conflict detected", zap.String("id", id))

delete(addresses, id)

// trigger another reconcile
r.QueueReconcile()
} else {
return fmt.Errorf("error updating resource: %w", err)
}
}
}

Expand Down
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sync/errgroup"
"inet.af/netaddr"

netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network"
Expand Down Expand Up @@ -190,6 +191,97 @@ func (suite *AddressMergeSuite) TestMerge() {
}))
}

//nolint:gocyclo
func (suite *AddressMergeSuite) TestMergeFlapping() {
// simulate two conflicting address definitions which are getting removed/added constantly
dhcp := network.NewAddressSpec(network.ConfigNamespaceName, "dhcp/eth0/10.0.0.1/8")
*dhcp.TypedSpec() = network.AddressSpecSpec{
Address: netaddr.MustParseIPPrefix("10.0.0.1/8"),
LinkName: "eth0",
Family: nethelpers.FamilyInet4,
Scope: nethelpers.ScopeGlobal,
ConfigLayer: network.ConfigOperator,
}

override := network.NewAddressSpec(network.ConfigNamespaceName, "configuration/eth0/10.0.0.1/8")
*override.TypedSpec() = network.AddressSpecSpec{
Address: netaddr.MustParseIPPrefix("10.0.0.1/8"),
LinkName: "eth0",
Family: nethelpers.FamilyInet4,
Scope: nethelpers.ScopeHost,
ConfigLayer: network.ConfigMachineConfiguration,
}

resources := []resource.Resource{dhcp, override}

flipflop := func(idx int) func() error {
return func() error {
for i := 0; i < 500; i++ {
if err := suite.state.Create(suite.ctx, resources[idx]); err != nil {
return err
}

if err := suite.state.Destroy(suite.ctx, resources[idx].Metadata()); err != nil {
return err
}

time.Sleep(time.Millisecond)
}

return suite.state.Create(suite.ctx, resources[idx])
}
}

var eg errgroup.Group

eg.Go(flipflop(0))
eg.Go(flipflop(1))
eg.Go(func() error {
// add/remove finalizer to the merged resource
for i := 0; i < 1000; i++ {
if err := suite.state.AddFinalizer(suite.ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "eth0/10.0.0.1/8", resource.VersionUndefined), "foo"); err != nil {
if !state.IsNotFoundError(err) {
return err
}

continue
} else {
suite.T().Log("finalizer added")
}

time.Sleep(10 * time.Millisecond)

if err := suite.state.RemoveFinalizer(suite.ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "eth0/10.0.0.1/8", resource.VersionUndefined), "foo"); err != nil {
if err != nil && !state.IsNotFoundError(err) {
return err
}
}
}

return nil
})

suite.Require().NoError(eg.Wait())

suite.Assert().NoError(retry.Constant(15*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
return suite.assertAddresses([]string{
"eth0/10.0.0.1/8",
}, func(r *network.AddressSpec) error {
if r.Metadata().Phase() != resource.PhaseRunning {
return retry.ExpectedErrorf("resource phase is %s", r.Metadata().Phase())
}

if *override.TypedSpec() != *r.TypedSpec() {
// using retry here, as it might not be reconciled immediately
return retry.ExpectedError(fmt.Errorf("not equal yet"))
}

return nil
})
}))
}

func (suite *AddressMergeSuite) TearDownTest() {
suite.T().Log("tear down")

Expand Down
13 changes: 11 additions & 2 deletions internal/app/machined/pkg/controllers/network/hostname_merge.go
Expand Up @@ -90,9 +90,18 @@ func (ctrl *HostnameMergeController) Run(ctx context.Context, r controller.Runti

return nil
}); err != nil {
return fmt.Errorf("error updating resource: %w", err)
if state.IsPhaseConflictError(err) {
// conflict
final.Hostname = ""

r.QueueReconcile()
} else {
return fmt.Errorf("error updating resource: %w", err)
}
}
} else {
}

if final.Hostname == "" {
// remove existing
var okToDestroy bool

Expand Down
12 changes: 11 additions & 1 deletion internal/app/machined/pkg/controllers/network/link_merge.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/resources/network"
Expand Down Expand Up @@ -109,7 +110,16 @@ func (ctrl *LinkMergeController) Run(ctx context.Context, r controller.Runtime,

return nil
}); err != nil {
return fmt.Errorf("error updating resource: %w", err)
if state.IsPhaseConflictError(err) {
logger.Debug("conflict detected", zap.String("id", id))

delete(links, id)

// trigger another reconcile
r.QueueReconcile()
} else {
return fmt.Errorf("error updating resource: %w", err)
}
}

logger.Debug("merged link spec", zap.String("id", id), zap.Any("spec", link))
Expand Down
89 changes: 89 additions & 0 deletions internal/app/machined/pkg/controllers/network/link_merge_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sync/errgroup"

netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network"
"github.com/talos-systems/talos/pkg/logging"
Expand Down Expand Up @@ -185,6 +186,94 @@ func (suite *LinkMergeSuite) TestMerge() {
}))
}

//nolint:gocyclo
func (suite *LinkMergeSuite) TestMergeFlapping() {
// simulate two conflicting link definitions which are getting removed/added constantly
dhcp := network.NewLinkSpec(network.ConfigNamespaceName, "dhcp/eth0")
*dhcp.TypedSpec() = network.LinkSpecSpec{
Name: "eth0",
Up: true,
MTU: 1450,
ConfigLayer: network.ConfigOperator,
}

static := network.NewLinkSpec(network.ConfigNamespaceName, "configuration/eth0")
*static.TypedSpec() = network.LinkSpecSpec{
Name: "eth0",
Up: true,
MTU: 1500,
ConfigLayer: network.ConfigMachineConfiguration,
}

resources := []resource.Resource{dhcp, static}

flipflop := func(idx int) func() error {
return func() error {
for i := 0; i < 500; i++ {
if err := suite.state.Create(suite.ctx, resources[idx]); err != nil {
return err
}

if err := suite.state.Destroy(suite.ctx, resources[idx].Metadata()); err != nil {
return err
}

time.Sleep(time.Millisecond)
}

return suite.state.Create(suite.ctx, resources[idx])
}
}

var eg errgroup.Group

eg.Go(flipflop(0))
eg.Go(flipflop(1))
eg.Go(func() error {
// add/remove finalizer to the merged resource
for i := 0; i < 1000; i++ {
if err := suite.state.AddFinalizer(suite.ctx, resource.NewMetadata(network.NamespaceName, network.LinkSpecType, "eth0", resource.VersionUndefined), "foo"); err != nil {
if !state.IsNotFoundError(err) {
return err
}

continue
} else {
suite.T().Log("finalizer added")
}

time.Sleep(10 * time.Millisecond)

if err := suite.state.RemoveFinalizer(suite.ctx, resource.NewMetadata(network.NamespaceName, network.LinkSpecType, "eth0", resource.VersionUndefined), "foo"); err != nil {
if err != nil && !state.IsNotFoundError(err) {
return err
}
}
}

return nil
})

suite.Require().NoError(eg.Wait())

suite.Assert().NoError(retry.Constant(15*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
return suite.assertLinks([]string{
"eth0",
}, func(r *network.LinkSpec) error {
if r.Metadata().Phase() != resource.PhaseRunning {
return retry.ExpectedErrorf("resource phase is %s", r.Metadata().Phase())
}

if r.TypedSpec().MTU != 1500 {
return retry.ExpectedErrorf("MTU %d != 1500", r.TypedSpec().MTU)
}

return nil
})
}))
}

func (suite *LinkMergeSuite) TearDownTest() {
suite.T().Log("tear down")

Expand Down
13 changes: 11 additions & 2 deletions internal/app/machined/pkg/controllers/network/resolver_merge.go
Expand Up @@ -98,9 +98,18 @@ func (ctrl *ResolverMergeController) Run(ctx context.Context, r controller.Runti

return nil
}); err != nil {
return fmt.Errorf("error updating resource: %w", err)
if state.IsPhaseConflictError(err) {
// conflict
final.DNSServers = nil

r.QueueReconcile()
} else {
return fmt.Errorf("error updating resource: %w", err)
}
}
} else {
}

if final.DNSServers == nil {
// remove existing
var okToDestroy bool

Expand Down
16 changes: 13 additions & 3 deletions internal/app/machined/pkg/controllers/network/route_merge.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/resources/network"
Expand Down Expand Up @@ -95,7 +96,16 @@ func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime,

return nil
}); err != nil {
return fmt.Errorf("error updating resource: %w", err)
if state.IsPhaseConflictError(err) {
logger.Debug("conflict detected", zap.String("id", id))

delete(routes, id)

// trigger another reconcile
r.QueueReconcile()
} else {
return fmt.Errorf("error updating resource: %w", err)
}
}
}

Expand All @@ -111,12 +121,12 @@ func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime,

okToDestroy, err = r.Teardown(ctx, res.Metadata())
if err != nil {
return fmt.Errorf("error cleaning up addresses: %w", err)
return fmt.Errorf("error cleaning up routes: %w", err)
}

if okToDestroy {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up addresses: %w", err)
return fmt.Errorf("error cleaning up routes: %w", err)
}
}
}
Expand Down

0 comments on commit 5f6ec3e

Please sign in to comment.