Skip to content

Commit

Permalink
wip: Add WaitGroup for SelectorCache user notifications
Browse files Browse the repository at this point in the history
[ upstream commit fc6ef4d ]

Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
Signed-off-by: Paul Chaignon <paul@cilium.io>
  • Loading branch information
jrajahalme authored and pchaigno committed Jul 29, 2021
1 parent c75c351 commit 960e3de
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 52 deletions.
9 changes: 5 additions & 4 deletions daemon/cmd/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSel
return selectorIdentitySliceMapping, newlyAllocatedIdentities, nil
}

func (d *Daemon) updateSelectorCacheFQDNs(ctx context.Context, selectors map[policyApi.FQDNSelector][]*identity.Identity, selectorsWithoutIPs []policyApi.FQDNSelector) (wg *sync.WaitGroup) {
func (d *Daemon) updateSelectorCacheFQDNs(ctx context.Context, selectors map[policyApi.FQDNSelector][]*identity.Identity, selectorsWithoutIPs []policyApi.FQDNSelector) *sync.WaitGroup {
// There may be nothing to update - in this case, we exit and do not need
// to trigger policy updates for all endpoints.
if len(selectors) == 0 && len(selectorsWithoutIPs) == 0 {
return &sync.WaitGroup{}
}

notifyWg := &sync.WaitGroup{}
// Update mapping of selector to set of IPs in selector cache.
for selector, identitySlice := range selectors {
log.WithFields(logrus.Fields{
Expand All @@ -115,7 +116,7 @@ func (d *Daemon) updateSelectorCacheFQDNs(ctx context.Context, selectors map[pol
// Nil check here? Hopefully not necessary...
numIds = append(numIds, numId.ID)
}
d.policy.GetSelectorCache().UpdateFQDNSelector(selector, numIds)
d.policy.GetSelectorCache().UpdateFQDNSelector(selector, numIds, notifyWg)
}

if len(selectorsWithoutIPs) > 0 {
Expand All @@ -126,10 +127,10 @@ func (d *Daemon) updateSelectorCacheFQDNs(ctx context.Context, selectors map[pol
log.WithFields(logrus.Fields{
"fqdnSelectors": selectorsWithoutIPs,
}).Debug("removing all identities from FQDN selectors")
d.policy.GetSelectorCache().RemoveIdentitiesFQDNSelectors(selectorsWithoutIPs)
d.policy.GetSelectorCache().RemoveIdentitiesFQDNSelectors(selectorsWithoutIPs, notifyWg)
}

return d.endpointManager.UpdatePolicyMaps(ctx)
return d.endpointManager.UpdatePolicyMaps(ctx, notifyWg)
}

// bootstrapFQDN initializes the toFQDNs related subsystems: DNSPoller,
Expand Down
5 changes: 4 additions & 1 deletion daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func (d *Daemon) TriggerPolicyUpdates(force bool, reason string) {
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (d *Daemon) UpdateIdentities(added, deleted cache.IdentityCache) {
d.policy.GetSelectorCache().UpdateIdentities(added, deleted)
wg := &sync.WaitGroup{}
d.policy.GetSelectorCache().UpdateIdentities(added, deleted, wg)
// Wait for update propagation to endpoints before triggering policy updates
wg.Wait()
d.TriggerPolicyUpdates(false, "one or more identities created or deleted")
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/endpointmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func waitForProxyCompletions(proxyWaitGroup *completion.WaitGroup) error {

// UpdatePolicyMaps returns a WaitGroup which is signaled upon once all endpoints
// have had their PolicyMaps updated against the Endpoint's desired policy state.
func (mgr *EndpointManager) UpdatePolicyMaps(ctx context.Context) *sync.WaitGroup {
func (mgr *EndpointManager) UpdatePolicyMaps(ctx context.Context, notifyWg *sync.WaitGroup) *sync.WaitGroup {
var epWG sync.WaitGroup
var wg sync.WaitGroup

Expand All @@ -126,6 +126,8 @@ func (mgr *EndpointManager) UpdatePolicyMaps(ctx context.Context) *sync.WaitGrou
// TODO: bound by number of CPUs?
for _, ep := range eps {
go func(ep *endpoint.Endpoint) {
// Proceed only after all notifications have been delivered to endpoints
notifyWg.Wait()
if err := ep.ApplyPolicyMapChanges(proxyWaitGroup); err != nil {
ep.Logger("endpointmanager").WithError(err).Warning("Failed to apply policy map changes. These will be re-applied in future updates.")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/fqdn/dnsproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -181,12 +182,14 @@ var (

func (s *DNSProxyTestSuite) SetUpSuite(c *C) {
// Add these identities
wg := &sync.WaitGroup{}
testSelectorCache.UpdateIdentities(cache.IdentityCache{
dstID1: labels.Labels{"Dst1": labels.NewLabel("Dst1", "test", labels.LabelSourceK8s)}.LabelArray(),
dstID2: labels.Labels{"Dst2": labels.NewLabel("Dst2", "test", labels.LabelSourceK8s)}.LabelArray(),
dstID3: labels.Labels{"Dst3": labels.NewLabel("Dst3", "test", labels.LabelSourceK8s)}.LabelArray(),
dstID4: labels.Labels{"Dst4": labels.NewLabel("Dst4", "test", labels.LabelSourceK8s)}.LabelArray(),
}, nil)
}, nil, wg)
wg.Wait()

s.repo = policy.NewPolicyRepository(nil, nil)
s.dnsTCPClient = &dns.Client{Net: "tcp", Timeout: time.Second, SingleInflight: true}
Expand Down
26 changes: 15 additions & 11 deletions pkg/policy/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package policy
import (
"fmt"
"sync"
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
Expand Down Expand Up @@ -195,10 +194,11 @@ func bootstrapRepo(ruleGenFunc func(int) api.Rules, numRules int, c *C) *Reposit
mgr := cache.NewCachingIdentityAllocator(&allocator.IdentityAllocatorOwnerMock{})
testRepo := NewPolicyRepository(mgr.GetIdentityCache(), nil)

var wg sync.WaitGroup
SetPolicyEnabled(option.DefaultEnforcement)
GenerateNumIdentities(3000)
testSelectorCache.UpdateIdentities(identityCache, nil)
wg := &sync.WaitGroup{}
testSelectorCache.UpdateIdentities(identityCache, nil, wg)
wg.Wait()
testRepo.selectorCache = testSelectorCache
rulez, _ := testRepo.AddList(ruleGenFunc(numRules))

Expand All @@ -210,7 +210,8 @@ func bootstrapRepo(ruleGenFunc func(int) api.Rules, numRules int, c *C) *Reposit
})

epsToRegen := NewEndpointSet(nil)
rulez.UpdateRulesEndpointsCaches(epSet, epsToRegen, &wg)
wg = &sync.WaitGroup{}
rulez.UpdateRulesEndpointsCaches(epSet, epsToRegen, wg)
wg.Wait()

c.Assert(epSet.Len(), Equals, 0)
Expand Down Expand Up @@ -506,8 +507,9 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressWildcard(c *C) {
added1 := cache.IdentityCache{
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
wg := &sync.WaitGroup{}
testSelectorCache.UpdateIdentities(added1, nil, wg)
wg.Wait()
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -577,17 +579,19 @@ func (ds *PolicyTestSuite) TestMapStateWithIngress(c *C) {
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
identity.NumericIdentity(194): labels.ParseSelectLabelArray("id=resolve_test_1", "num=3"),
}
testSelectorCache.UpdateIdentities(added1, nil)
wg := &sync.WaitGroup{}
testSelectorCache.UpdateIdentities(added1, nil, wg)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
defer testSelectorCache.UpdateIdentities(nil, added1, wg)
wg.Wait()
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
wg = &sync.WaitGroup{}
testSelectorCache.UpdateIdentities(nil, deleted1, wg)
wg.Wait()
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
48 changes: 31 additions & 17 deletions pkg/policy/selectorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@ type identitySelector interface {
// Called with NameManager and SelectorCache locks held
removeUser(CachedSelectionUser, identityNotifier) (last bool)

// This may be called while the NameManager lock is held
notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity)
// This may be called while the NameManager lock is held. wg.Wait()
// returns after user notifications have been completed, which may require
// taking Endpoint and SelectorCache locks, so these locks must not be
// held when calling wg.Wait().
notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity, wg *sync.WaitGroup)

numUsers() int
}
Expand Down Expand Up @@ -193,6 +196,7 @@ type userNotification struct {
selector CachedSelector
added []identity.NumericIdentity
deleted []identity.NumericIdentity
wg *sync.WaitGroup
}

// SelectorCache caches identities, identity selectors, and the
Expand Down Expand Up @@ -257,17 +261,20 @@ func (sc *SelectorCache) handleUserNotifications() {

for _, n := range notifications {
n.user.IdentitySelectionUpdated(n.selector, n.added, n.deleted)
n.wg.Done()
}
}
}

func (sc *SelectorCache) queueUserNotification(user CachedSelectionUser, selector CachedSelector, added, deleted []identity.NumericIdentity) {
func (sc *SelectorCache) queueUserNotification(user CachedSelectionUser, selector CachedSelector, added, deleted []identity.NumericIdentity, wg *sync.WaitGroup) {
wg.Add(1)
sc.userMutex.Lock()
sc.userNotes = append(sc.userNotes, userNotification{
user: user,
selector: selector,
added: added,
deleted: deleted,
wg: wg,
})
sc.userMutex.Unlock()
sc.userCond.Signal()
Expand Down Expand Up @@ -429,10 +436,10 @@ type fqdnSelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (f *fqdnSelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
func (f *fqdnSelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity, wg *sync.WaitGroup) {
for user := range f.users {
// pass 'f' to the user as '*fqdnSelector'
sc.queueUserNotification(user, f, added, deleted)
sc.queueUserNotification(user, f, added, deleted, wg)
}
}

Expand Down Expand Up @@ -488,10 +495,10 @@ type labelIdentitySelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (l *labelIdentitySelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
func (l *labelIdentitySelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity, wg *sync.WaitGroup) {
for user := range l.users {
// pass 'l' to the user as '*labelIdentitySelector'
sc.queueUserNotification(user, l, added, deleted)
sc.queueUserNotification(user, l, added, deleted, wg)
}
}

Expand Down Expand Up @@ -529,15 +536,18 @@ func (l *labelIdentitySelector) matches(identity scIdentity) bool {

// UpdateFQDNSelector updates the mapping of fqdnKey (the FQDNSelector from a
// policy rule as a string) to to the provided list of identities. If the contents
// of the cachedSelections differ from those in the identities slice, all
// users are notified.
func (sc *SelectorCache) UpdateFQDNSelector(fqdnSelec api.FQDNSelector, identities []identity.NumericIdentity) {
// of the cachedSelections differ from those in the identities slice, all users
// are notified asynchronously. Caller should Wait() on the returned
// sync.WaitGroup before triggering any policy updates. Policy updates may need
// Endpoint locks, so this Wait() can deadlock if the caller is holding any
// endpoint locks.
func (sc *SelectorCache) UpdateFQDNSelector(fqdnSelec api.FQDNSelector, identities []identity.NumericIdentity, wg *sync.WaitGroup) {
sc.mutex.Lock()
sc.updateFQDNSelector(fqdnSelec, identities)
sc.updateFQDNSelector(fqdnSelec, identities, wg)
sc.mutex.Unlock()
}

func (sc *SelectorCache) updateFQDNSelector(fqdnSelec api.FQDNSelector, identities []identity.NumericIdentity) {
func (sc *SelectorCache) updateFQDNSelector(fqdnSelec api.FQDNSelector, identities []identity.NumericIdentity, wg *sync.WaitGroup) {
fqdnKey := fqdnSelec.String()

var fqdnSel *fqdnSelector
Expand Down Expand Up @@ -621,7 +631,7 @@ func (sc *SelectorCache) updateFQDNSelector(fqdnSelec api.FQDNSelector, identiti
// getting the CIDR identities which correspond to this FQDNSelector. This
// is the primary difference here between FQDNSelector and IdentitySelector.
fqdnSel.updateSelections()
fqdnSel.notifyUsers(sc, added, deleted) // disjoint sets, see the comment above
fqdnSel.notifyUsers(sc, added, deleted, wg) // disjoint sets, see the comment above
}

// AddFQDNSelector adds the given api.FQDNSelector in to the selector cache. If
Expand Down Expand Up @@ -824,7 +834,11 @@ func (sc *SelectorCache) ChangeUser(selector CachedSelector, from, to CachedSele
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache) {
//
// Caller should Wait() on the returned sync.WaitGroup before triggering any
// policy updates. Policy updates may need Endpoint locks, so this Wait() can
// deadlock if the caller is holding any endpoint locks.
func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache, wg *sync.WaitGroup) {
sc.mutex.Lock()
defer sc.mutex.Unlock()

Expand Down Expand Up @@ -880,7 +894,7 @@ func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache) {
}
if len(dels)+len(adds) > 0 {
idSel.updateSelections()
idSel.notifyUsers(sc, adds, dels)
idSel.notifyUsers(sc, adds, dels, wg)
}
case *fqdnSelector:
// This is a no-op right now. We don't encode in the identities
Expand All @@ -892,12 +906,12 @@ func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache) {

// RemoveIdentitiesFQDNSelectors removes all identities from being mapped to the
// set of FQDNSelectors.
func (sc *SelectorCache) RemoveIdentitiesFQDNSelectors(fqdnSels []api.FQDNSelector) {
func (sc *SelectorCache) RemoveIdentitiesFQDNSelectors(fqdnSels []api.FQDNSelector, wg *sync.WaitGroup) {
sc.mutex.Lock()
noIdentities := []identity.NumericIdentity{}

for i := range fqdnSels {
sc.updateFQDNSelector(fqdnSels[i], noIdentities)
sc.updateFQDNSelector(fqdnSels[i], noIdentities, wg)
}
sc.mutex.Unlock()
}
Loading

0 comments on commit 960e3de

Please sign in to comment.