Skip to content

Commit

Permalink
ipcache: aggregate labels from all IPs with local host identity
Browse files Browse the repository at this point in the history
The `reserved:host` identity is special: the numeric identity is fixed
and the set of labels is mutable. (The datapath requires this.) So,
we need to determine all prefixes that have the `reserved:host` label and
capture their labels. Then, we must aggregate *all* labels from all IPs and
insert them as the `reserved:host` identity labels.

Likewise, we need to always update the SelectorCache; we cannot
short-circuit if the ipcache already has that identity. Again, this is
needed because the identity is mutable.

Without this, when there are multiple IPs with the host label, the
identity may flap and the SelectorCache may be missing updates.

Fixes: cilium#28259

Signed-off-by: Casey Callendrello <cdc@isovalent.com>
  • Loading branch information
squeed committed Sep 29, 2023
1 parent cc56e70 commit 59e7f3b
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 7 deletions.
7 changes: 6 additions & 1 deletion pkg/ipcache/ipcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cilium/cilium/pkg/checker"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
identityPkg "github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/source"
testidentity "github.com/cilium/cilium/pkg/testutils/identity"
"github.com/cilium/cilium/pkg/types"
Expand All @@ -33,6 +34,7 @@ type IPCacheTestSuite struct {
var (
_ = Suite(&IPCacheTestSuite{})
IPIdentityCache *IPCache
PolicyHandler *mockUpdater
)

func Test(t *testing.T) {
Expand All @@ -42,10 +44,13 @@ func Test(t *testing.T) {
func (s *IPCacheTestSuite) SetUpTest(c *C) {
ctx, cancel := context.WithCancel(context.Background())
allocator := testidentity.NewMockIdentityAllocator(nil)
PolicyHandler = &mockUpdater{
identities: make(map[identityPkg.NumericIdentity]labels.LabelArray),
}
IPIdentityCache = NewIPCache(&Configuration{
Context: ctx,
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: PolicyHandler,
DatapathHandler: &mockTriggerer{},
})

Expand Down
58 changes: 56 additions & 2 deletions pkg/ipcache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,21 @@ type metadata struct {
// generate updates into the ipcache, policy engine and datapath.
queuedChangesMU lock.Mutex
queuedPrefixes map[netip.Prefix]struct{}

// reservedHostLock protects the localHostLabels map
reservedHostLock lock.Mutex

// reservedHostLabels collects all labels that apply to the host identity.
// see updateLocalHostLabels() for more info.
reservedHostLabels map[netip.Prefix]labels.Labels
}

func newMetadata() *metadata {
return &metadata{
m: make(map[netip.Prefix]PrefixInfo),
queuedPrefixes: make(map[netip.Prefix]struct{}),

reservedHostLabels: make(map[netip.Prefix]labels.Labels),
}
}

Expand Down Expand Up @@ -211,6 +220,14 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
// Already deleted, no new metadata to associate
continue
} // else continue below to remove the old entry

// Special case: if we are removing an IP from the reserved:host identity,
// we need to remove this prefix's labels from the local identity and
// force an update to the selector cache.
if oldID.ID == identity.ReservedIdentityHost {
i := ipc.updateReservedHostLabels(prefix, nil)
idsToAdd[i.ID] = i.Labels.LabelArray()
}
} else {
var newID *identity.Identity

Expand Down Expand Up @@ -239,6 +256,12 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
break
}

// Special case: the host identity is mutable; we need to always update the SelectorCache.
// If the labels are unchanged, the SelectorCache will short-circuit.
if newID.ID == identity.ReservedIdentityHost {
idsToAdd[newID.ID] = newID.Labels.LabelArray()
}

// We can safely skip the ipcache upsert if the entry matches with
// the entry in the metadata cache exactly.
// Note that checking ID alone is insufficient, see GH-24502.
Expand Down Expand Up @@ -478,8 +501,12 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
// for itself). For all other identities, we avoid modifying
// the labels at runtime and instead opt to allocate new
// identities below.
identity.AddReservedIdentityWithLabels(identity.ReservedIdentityHost, lbls)
return identity.LookupReservedIdentity(identity.ReservedIdentityHost), false, nil
//
// As an extra gotcha, we need need to merge all labels for all IPs
// that resolve to the reserved:host identity, otherwise we can
// flap identities labels depending on which prefix writes first. See GH-28259.
i := ipc.updateReservedHostLabels(prefix, lbls)
return i, false, nil
}

// If no other labels are associated with this IP, we assume that it's
Expand Down Expand Up @@ -511,6 +538,33 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
return id, isNew, err
}

// updateReservedHostLabels adds or removes labels that apply to the local host.
// The `reserved:host` identity is special: the numeric identity is fixed
// and the set of labels is mutable. (The datapath requires this.) So,
// we need to determine all prefixes that have the `reserved:host` label and
// capture their labels. Then, we must aggregate *all* labels from all IPs and
// insert them as the `reserved:host` identity labels.
func (ipc *IPCache) updateReservedHostLabels(prefix netip.Prefix, lbls labels.Labels) *identity.Identity {
ipc.metadata.reservedHostLock.Lock()
defer ipc.metadata.reservedHostLock.Unlock()
if lbls == nil {
delete(ipc.metadata.reservedHostLabels, prefix)
} else {
ipc.metadata.reservedHostLabels[prefix] = lbls
}

// aggregate all labels and update static identity
newLabels := labels.Labels{}
for _, l := range ipc.metadata.reservedHostLabels {
newLabels.MergeLabels(l)
}

log.WithField(logfields.Labels, newLabels).Debug("Merged labels for reserved:host identity")

identity.AddReservedIdentityWithLabels(identity.ReservedIdentityHost, newLabels)
return identity.LookupReservedIdentity(identity.ReservedIdentityHost)
}

// RemoveLabelsExcluded removes the given labels from all IPs inside the IDMD
// except for the IPs / prefixes inside the given excluded set.
//
Expand Down
82 changes: 78 additions & 4 deletions pkg/ipcache/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,63 @@ func TestInjectLabels(t *testing.T) {
option.Config.EnableIPv6 = true
}

// Test that when multiple IPs have the `resolved:host` label, we correctly
// aggregate all labels *and* update the selector cache correctly.
// This reproduces GH-28259.
func TestUpdateLocalNode(t *testing.T) {
cancel := setupTest(t)
defer cancel()

ctx := context.Background()

hostid := identity.ReservedIdentityHost

bothLabels := labels.Labels{}
bothLabels.MergeLabels(labels.LabelHost)
bothLabels.MergeLabels(labels.LabelKubeAPIServer)

selectorCacheHas := func(lbls labels.Labels) {
t.Helper()
id := PolicyHandler.identities[hostid]
assert.NotNil(t, id)

assert.True(t, id.Equals(lbls.LabelArray()), "labels should equal: %v -- %v", id, lbls.LabelArray())
}

// Mark .4 as local host
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "node-uid", labels.LabelHost)
remaining, err := IPIdentityCache.InjectLabels(ctx, []netip.Prefix{inClusterPrefix})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
assert.Equal(t, IPIdentityCache.ipToIdentityCache["10.0.0.4/32"].ID, hostid)
selectorCacheHas(labels.LabelHost)

// Mark .4 as kube-apiserver
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "kube-uid", labels.LabelKubeAPIServer)
remaining, err = IPIdentityCache.InjectLabels(ctx, []netip.Prefix{inClusterPrefix})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
assert.Equal(t, IPIdentityCache.ipToIdentityCache["10.0.0.4/32"].ID, hostid)
selectorCacheHas(bothLabels)

// Mark .5 as local host
IPIdentityCache.metadata.upsertLocked(inClusterPrefix2, source.Local, "node-uid", labels.LabelHost)
remaining, err = IPIdentityCache.InjectLabels(ctx, []netip.Prefix{inClusterPrefix2})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
assert.Equal(t, IPIdentityCache.ipToIdentityCache["10.0.0.5/32"].ID, hostid)
selectorCacheHas(bothLabels)

// remove kube-apiserver
IPIdentityCache.metadata.remove(inClusterPrefix, "kube-uid", overrideIdentity(false), labels.LabelKubeAPIServer)
remaining, err = IPIdentityCache.InjectLabels(ctx, []netip.Prefix{inClusterPrefix})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
assert.Equal(t, IPIdentityCache.ipToIdentityCache["10.0.0.4/32"].ID, hostid)

selectorCacheHas(labels.LabelHost)
}

// TestInjectExisting tests "upgrading" an existing identity to the apiserver.
// This is a common occurrence on startup - and this tests ensures we don't
// regress the known issue in GH-24502
Expand Down Expand Up @@ -461,7 +518,7 @@ func TestOverrideIdentity(t *testing.T) {

ipc := NewIPCache(&Configuration{
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: newMockUpdater(),
DatapathHandler: &mockTriggerer{},
})
ctx := context.Background()
Expand Down Expand Up @@ -609,10 +666,11 @@ func setupTest(t *testing.T) (cleanup func()) {

ctx, cancel := context.WithCancel(context.Background())
allocator := testidentity.NewMockIdentityAllocator(nil)
PolicyHandler = newMockUpdater()
IPIdentityCache = NewIPCache(&Configuration{
Context: ctx,
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: PolicyHandler,
DatapathHandler: &mockTriggerer{},
})

Expand All @@ -625,9 +683,25 @@ func setupTest(t *testing.T) (cleanup func()) {
}
}

type mockUpdater struct{}
func newMockUpdater() *mockUpdater {
return &mockUpdater{
identities: make(map[identity.NumericIdentity]labels.LabelArray),
}
}

type mockUpdater struct {
identities map[identity.NumericIdentity]labels.LabelArray
}

func (m *mockUpdater) UpdateIdentities(_, _ cache.IdentityCache, _ *sync.WaitGroup) {}
func (m *mockUpdater) UpdateIdentities(added, deleted cache.IdentityCache, _ *sync.WaitGroup) {
for nid, lbls := range added {
m.identities[nid] = lbls
}

for nid := range deleted {
delete(m.identities, nid)
}
}

type mockTriggerer struct{}

Expand Down

0 comments on commit 59e7f3b

Please sign in to comment.