Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire contexts to RBAC controllers #105550

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ func startClusterRoleAggregrationController(ctx context.Context, controllerConte
go clusterroleaggregation.NewClusterRoleAggregation(
controllerContext.InformerFactory.Rbac().V1().ClusterRoles(),
controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(5, ctx.Done())
).Run(ctx, 5)
return nil, true, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type ClusterRoleAggregationController struct {
clusterRoleLister rbaclisters.ClusterRoleLister
clusterRolesSynced cache.InformerSynced

syncHandler func(key string) error
syncHandler func(ctx context.Context, key string) error
queue workqueue.RateLimitingInterface
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInfo
return c
}

func (c *ClusterRoleAggregationController) syncClusterRole(key string) error {
func (c *ClusterRoleAggregationController) syncClusterRole(ctx context.Context, key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
Expand Down Expand Up @@ -126,36 +126,36 @@ func (c *ClusterRoleAggregationController) syncClusterRole(key string) error {
}

if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
err = c.applyClusterRoles(sharedClusterRole.Name, newPolicyRules)
err = c.applyClusterRoles(ctx, sharedClusterRole.Name, newPolicyRules)
if errors.IsUnsupportedMediaType(err) { // TODO: Remove this fallback at least one release after ServerSideApply GA
// When Server Side Apply is not enabled, fallback to Update. This is required when running
// 1.21 since api-server can be 1.20 during the upgrade/downgrade.
// Since Server Side Apply is enabled by default in Beta, this fallback only kicks in
// if the feature has been disabled using its feature flag.
err = c.updateClusterRoles(sharedClusterRole, newPolicyRules)
err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules)
}
} else {
err = c.updateClusterRoles(sharedClusterRole, newPolicyRules)
err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules)
}
return err
}

func (c *ClusterRoleAggregationController) applyClusterRoles(name string, newPolicyRules []rbacv1.PolicyRule) error {
func (c *ClusterRoleAggregationController) applyClusterRoles(ctx context.Context, name string, newPolicyRules []rbacv1.PolicyRule) error {
clusterRoleApply := rbacv1ac.ClusterRole(name).
WithRules(toApplyPolicyRules(newPolicyRules)...)

opts := metav1.ApplyOptions{FieldManager: "clusterrole-aggregation-controller", Force: true}
_, err := c.clusterRoleClient.ClusterRoles().Apply(context.TODO(), clusterRoleApply, opts)
_, err := c.clusterRoleClient.ClusterRoles().Apply(ctx, clusterRoleApply, opts)
return err
}

func (c *ClusterRoleAggregationController) updateClusterRoles(sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error {
func (c *ClusterRoleAggregationController) updateClusterRoles(ctx context.Context, sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error {
clusterRole := sharedClusterRole.DeepCopy()
clusterRole.Rules = nil
for _, rule := range newPolicyRules {
clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy())
}
_, err := c.clusterRoleClient.ClusterRoles().Update(context.TODO(), clusterRole, metav1.UpdateOptions{})
_, err := c.clusterRoleClient.ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{})
return err
}

Expand Down Expand Up @@ -187,37 +187,37 @@ func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
}

// Run starts the controller and blocks until stopCh is closed.
func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct{}) {
func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting ClusterRoleAggregator")
defer klog.Infof("Shutting down ClusterRoleAggregator")

if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) {
if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", ctx.Done(), c.clusterRolesSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}

<-stopCh
<-ctx.Done()
}

func (c *ClusterRoleAggregationController) runWorker() {
for c.processNextWorkItem() {
func (c *ClusterRoleAggregationController) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}

func (c *ClusterRoleAggregationController) processNextWorkItem() bool {
func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Context) bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

err := c.syncHandler(dsKey.(string))
err := c.syncHandler(ctx, dsKey.(string))
if err == nil {
c.queue.Forget(dsKey)
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package clusterroleaggregation

import (
"context"
"testing"

rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestSyncClusterRole(t *testing.T) {
clusterRoleClient: fakeClient.RbacV1(),
clusterRoleLister: rbaclisters.NewClusterRoleLister(indexer),
}
err := c.syncClusterRole(test.clusterRoleToSync)
err := c.syncClusterRole(context.TODO(), test.clusterRoleToSync)
if err != nil {
t.Fatal(err)
}
Expand Down