Skip to content

Commit

Permalink
Send available condition events for managed cluster
Browse files Browse the repository at this point in the history
Signed-off-by: zhujian <jiazhu@redhat.com>
  • Loading branch information
zhujian7 committed Apr 30, 2024
1 parent 8acc075 commit fed910c
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 34 deletions.
27 changes: 18 additions & 9 deletions pkg/registration/hub/lease/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
coordv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coordinformers "k8s.io/client-go/informers/coordination/v1"
"k8s.io/client-go/kubernetes"
coordlisters "k8s.io/client-go/listers/coordination/v1"
kevents "k8s.io/client-go/tools/events"
"k8s.io/utils/pointer"

clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
Expand All @@ -34,11 +36,12 @@ var (

// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
type leaseController struct {
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
kubeClient kubernetes.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
mcEventRecorder kevents.EventRecorder
}

// NewClusterLeaseController creates a cluster lease controller on hub cluster.
Expand All @@ -47,15 +50,17 @@ func NewClusterLeaseController(
clusterClient clientset.Interface,
clusterInformer clusterv1informer.ManagedClusterInformer,
leaseInformer coordinformers.LeaseInformer,
recorder events.Recorder) factory.Controller {
recorder events.Recorder,
mcEventRecorder kevents.EventRecorder) factory.Controller {
c := &leaseController{
kubeClient: kubeClient,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
mcEventRecorder: mcEventRecorder,
}
return factory.New().
WithFilteredEventsInformersQueueKeysFunc(
Expand Down Expand Up @@ -150,6 +155,10 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus
c.eventRecorder.Eventf("ManagedClusterAvailableConditionUpdated",
"update managed cluster %q available condition to unknown, due to its lease is not updated constantly",
cluster.Name)
newClusterCopy := newCluster.DeepCopy()
newClusterCopy.SetNamespace(newClusterCopy.Name)
c.mcEventRecorder.Eventf(newClusterCopy, nil, corev1.EventTypeWarning, "AvailableUnknown",
"The managed cluster (%s) cannot connect to the hub cluster.", cluster.Name)
}

return err
Expand Down
44 changes: 35 additions & 9 deletions pkg/registration/hub/lease/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
v1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"

"open-cluster-management.io/ocm/pkg/common/helpers"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestSync(t *testing.T) {
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute)),
testinghelpers.NewManagedClusterLease(fmt.Sprintf("cluster-lease-%s", testinghelpers.TestManagedClusterName), now.Add(-5*time.Minute)),
},
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
validateActions: func(t *testing.T, hubKubeActions, clusterActions []clienttesting.Action) {
expected := metav1.Condition{
Type: clusterv1.ManagedClusterConditionAvailable,
Status: metav1.ConditionUnknown,
Expand All @@ -72,6 +74,22 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)

if len(hubKubeActions) != 1 {
t.Errorf("Expected 1 event created in the sync loop, actual %d",
len(hubKubeActions))
}
actionEvent := hubKubeActions[0]
if actionEvent.GetResource().Resource != "events" {
t.Errorf("Expected event created, actual %s", actionEvent.GetResource())
}
if actionEvent.GetNamespace() != testinghelpers.TestManagedClusterName {
t.Errorf("Expected event created in namespace %s, actual %s",
testinghelpers.TestManagedClusterName, actionEvent.GetNamespace())
}
if actionEvent.GetVerb() != "create" {
t.Errorf("Expected event created, actual %s", actionEvent.GetVerb())
}
},
},
{
Expand Down Expand Up @@ -123,31 +141,39 @@ func TestSync(t *testing.T) {
}
}

leaseClient := kubefake.NewSimpleClientset(c.clusterLeases...)
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(leaseClient, time.Minute*10)
hubClient := kubefake.NewSimpleClientset(c.clusterLeases...)
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(hubClient, time.Minute*10)
leaseStore := leaseInformerFactory.Coordination().V1().Leases().Informer().GetStore()
for _, lease := range c.clusterLeases {
if err := leaseStore.Add(lease); err != nil {
t.Fatal(err)
}
}

ctx := context.TODO()
syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)

mcEventRecorder, err := helpers.NewEventRecorder(ctx, clusterscheme.Scheme, hubClient, "leaseController")
if err != nil {
t.Fatal(err)
}
ctrl := &leaseController{
kubeClient: leaseClient,
kubeClient: hubClient,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),
mcEventRecorder: mcEventRecorder,
}
syncErr := ctrl.sync(context.TODO(), syncCtx)
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
}
c.validateActions(t, leaseClient.Actions(), clusterClient.Actions())

// wait for the event to be recorded
time.Sleep(100 * time.Millisecond)
c.validateActions(t, hubClient.Actions(), clusterClient.Actions())
})
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/registration/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/klog/v2"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"

addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
Expand All @@ -26,6 +27,7 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"

commonhelpers "open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"open-cluster-management.io/ocm/pkg/registration/hub/addon"
Expand Down Expand Up @@ -186,12 +188,17 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
)
}

mcRecorder, err := commonhelpers.NewEventRecorder(ctx, clusterscheme.Scheme, kubeClient, "registrationHub")
if err != nil {
return err
}
leaseController := lease.NewClusterLeaseController(
kubeClient,
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
kubeInformers.Coordination().V1().Leases(),
controllerContext.EventRecorder,
mcRecorder,
)

clockSyncController := lease.NewClockSyncController(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ rules:
- apiGroups: ["addon.open-cluster-management.io"]
resources: ["managedclusteraddons/status"]
verbs: ["patch", "update"]
# Allow agent to send events to the hub
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["create"]
24 changes: 21 additions & 3 deletions pkg/registration/spoke/managedcluster/claim_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
fakekube "k8s.io/client-go/kubernetes/fake"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
ocmfeature "open-cluster-management.io/api/feature"

"open-cluster-management.io/ocm/pkg/common/helpers"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/features"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
Expand Down Expand Up @@ -107,6 +109,13 @@ func TestSync(t *testing.T) {
}
}

fakeHubClient := fakekube.NewSimpleClientset()
ctx := context.TODO()
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
clusterscheme.Scheme, fakeHubClient, "claimController")
if err != nil {
t.Fatal(err)
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
Expand All @@ -116,9 +125,10 @@ func TestSync(t *testing.T) {
kubeInformerFactory.Core().V1().Nodes(),
20,
eventstesting.NewTestingEventRecorder(t),
hubEventRecorder,
)

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
syncErr := ctrl.sync(ctx, testingcommon.NewFakeSyncContext(t, ""))
testingcommon.AssertError(t, syncErr, c.expectedErr)

c.validateActions(t, clusterClient.Actions())
Expand Down Expand Up @@ -330,6 +340,13 @@ func TestExposeClaims(t *testing.T) {
c.maxCustomClusterClaims = 20
}

fakeHubClient := fakekube.NewSimpleClientset()
ctx := context.TODO()
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
clusterscheme.Scheme, fakeHubClient, "claimController")
if err != nil {
t.Fatal(err)
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
Expand All @@ -339,9 +356,10 @@ func TestExposeClaims(t *testing.T) {
kubeInformerFactory.Core().V1().Nodes(),
c.maxCustomClusterClaims,
eventstesting.NewTestingEventRecorder(t),
hubEventRecorder,
)

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, c.cluster.Name))
syncErr := ctrl.sync(ctx, testingcommon.NewFakeSyncContext(t, c.cluster.Name))
testingcommon.AssertError(t, syncErr, c.expectedErr)

c.validateActions(t, clusterClient.Actions())
Expand Down
13 changes: 12 additions & 1 deletion pkg/registration/spoke/managedcluster/joining_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
fakekube "k8s.io/client-go/kubernetes/fake"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/common/helpers"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
Expand Down Expand Up @@ -78,6 +81,13 @@ func TestSyncManagedCluster(t *testing.T) {
}
}

fakeHubClient := fakekube.NewSimpleClientset()
ctx := context.TODO()
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
clusterscheme.Scheme, fakeHubClient, "joinController")
if err != nil {
t.Fatal(err)
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
Expand All @@ -87,9 +97,10 @@ func TestSyncManagedCluster(t *testing.T) {
kubeInformerFactory.Core().V1().Nodes(),
20,
eventstesting.NewTestingEventRecorder(t),
hubEventRecorder,
)

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
syncErr := ctrl.sync(ctx, testingcommon.NewFakeSyncContext(t, ""))
testingcommon.AssertError(t, syncErr, c.expectedErr)

c.validateActions(t, clusterClient.Actions())
Expand Down

0 comments on commit fed910c

Please sign in to comment.