Skip to content

Commit

Permalink
Ignore source and change ref.UID
Browse files Browse the repository at this point in the history
The event source is actually irrelevant.
  • Loading branch information
Nuckal777 committed Sep 13, 2021
1 parent a909500 commit 7b85ae1
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 63 deletions.
2 changes: 1 addition & 1 deletion controllers/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ = Describe("The controller", func() {
err = k8sClient.List(context.Background(), events)
Expect(err).To(Succeed())
Expect(events.Items).ToNot(HaveLen(0))
Expect(events.Items[0].Source.Host).To(Equal("targetnode"))
Expect(events.Items[0].InvolvedObject.UID).To(BeEquivalentTo("targetnode"))
})

It("should annotate the last used profile", func() {
Expand Down
9 changes: 2 additions & 7 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ import (
"path/filepath"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sapcc/maintenance-controller/event"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -129,18 +127,15 @@ var _ = BeforeSuite(func() {
k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: "0",
EventBroadcaster: event.NewNodeBroadcaster(),
})
Expect(err).ToNot(HaveOccurred())

clientSet, err := kubernetes.NewForConfig(cfg)
Expect(err).To(Succeed())

eventRecorder := event.MakeRecorder(logr.Discard(), scheme.Scheme, clientSet)
err = (&NodeReconciler{
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("maintenance"),
Scheme: k8sManager.GetScheme(),
Recorder: eventRecorder,
Recorder: k8sManager.GetEventRecorderFor("controller"),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

Expand Down
43 changes: 11 additions & 32 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ import (
"math/rand"
"time"

"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/record/util"
Expand All @@ -53,7 +51,7 @@ type eventBroadcasterImpl struct {
}

// Creates a new event broadcaster.
func NewSourcingBroadcaster() record.EventBroadcaster {
func NewNodeBroadcaster() record.EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
Expand Down Expand Up @@ -195,17 +193,17 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) w

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder {
return &SourcingRecorder{scheme, source, e.Broadcaster, clock.RealClock{}}
return &NodeRecorder{scheme, source, e.Broadcaster, clock.RealClock{}}
}

type SourcingRecorder struct {
type NodeRecorder struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
}

func (recorder *SourcingRecorder) generateEvent(object runtime.Object, annotations map[string]string,
func (recorder *NodeRecorder) generateEvent(object runtime.Object, annotations map[string]string,
source *v1.EventSource, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
Expand Down Expand Up @@ -233,33 +231,25 @@ func (recorder *SourcingRecorder) generateEvent(object runtime.Object, annotatio
}()
}

func (recorder *SourcingRecorder) Event(object runtime.Object, eventtype, reason, message string) {
func (recorder *NodeRecorder) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, nil, eventtype, reason, message)
}

func (recorder *SourcingRecorder) Eventf(object runtime.Object,
func (recorder *NodeRecorder) Eventf(object runtime.Object,
eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *SourcingRecorder) AnnotatedEventf(object runtime.Object,
func (recorder *NodeRecorder) AnnotatedEventf(object runtime.Object,
annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, nil, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *SourcingRecorder) SourcedEvent(object runtime.Object, source v1.EventSource,
eventtype, reason, message string) {
recorder.generateEvent(object, nil, &source, eventtype, reason, message)
}

func (recorder *SourcingRecorder) SourcedEventf(object runtime.Object, source v1.EventSource, eventtype, reason,
messageFmt string, args ...interface{}) {
recorder.SourcedEvent(object, source, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *SourcingRecorder) makeEvent(ref *v1.ObjectReference, annotations map[string]string,
func (recorder *NodeRecorder) makeEvent(ref *v1.ObjectReference, annotations map[string]string,
eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
// this makes the event appear in kubectl describe node.
ref.UID = types.UID(ref.Name)
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
Expand All @@ -279,14 +269,3 @@ func (recorder *SourcingRecorder) makeEvent(ref *v1.ObjectReference, annotations
Type: eventtype,
}
}

func MakeRecorder(log logr.Logger, scheme *runtime.Scheme, clientSet *kubernetes.Clientset) record.EventRecorder {
eventBroadcaster := NewSourcingBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
eventBroadcaster.StartEventWatcher(
func(e *v1.Event) {
log.Info("Send event", "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
})
eventRecorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "maintenance-controller"})
return eventRecorder
}
17 changes: 4 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -97,14 +95,15 @@ func main() {
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "6a2f7a03.cloud.sap",
EventBroadcaster: event.NewNodeBroadcaster(),
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

setupChecks(mgr)
err = setupReconcilers(mgr, enableESXMaintenance, restConfig)
err = setupReconcilers(mgr, enableESXMaintenance)
if err != nil {
setupLog.Error(err, "problem setting up reconcilers")
os.Exit(1)
Expand All @@ -130,20 +129,12 @@ func setupChecks(mgr manager.Manager) {
}
}

func setupReconcilers(mgr manager.Manager, enableESXMaintenance bool, restConfig *rest.Config) error {
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("Failed to init clientSet: %w", err)
}

eventLog := ctrl.Log.WithName("controllers").WithName("events")
eventRecorder := event.MakeRecorder(eventLog, mgr.GetScheme(), clientSet)

func setupReconcilers(mgr manager.Manager, enableESXMaintenance bool) error {
if err := (&controllers.NodeReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("maintenance"),
Scheme: mgr.GetScheme(),
Recorder: eventRecorder,
Recorder: mgr.GetEventRecorderFor("maintenance"),
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("Failed to setup maintenance controller node reconciler: %w", err)
}
Expand Down
15 changes: 5 additions & 10 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"strings"
"time"

"github.com/sapcc/maintenance-controller/event"
"github.com/sapcc/maintenance-controller/plugin"
v1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -95,22 +94,18 @@ func FromLabel(label NodeStateLabel, chains PluginChains, interval time.Duration
// Returns the next node state.
// In case of an error state.Label() is retuned alongside with the error.
func Apply(state NodeState, node *v1.Node, data *Data, params plugin.Parameters) (NodeStateLabel, error) {
recorder := params.Recorder.(*event.SourcingRecorder)
source := v1.EventSource{
Component: "maintenance-controller",
Host: node.Name,
}
recorder := params.Recorder
// invoke notifications and check for transition
err := state.Notify(params, data)
if err != nil {
recorder.SourcedEventf(node, source, "Normal", "ChangeMaintenanceStateFailed",
recorder.Eventf(node, "Normal", "ChangeMaintenanceStateFailed",
"At least one notification plugin failed for profile %v: Will stay in %v state",
params.Profile.Current, params.State)
return state.Label(), fmt.Errorf("failed to notify for profile %v: %w", params.Profile.Current, err)
}
next, err := state.Transition(params, data)
if err != nil {
recorder.SourcedEventf(node, source, "Normal", "ChangeMaintenanceStateFailed",
recorder.Eventf(node, "Normal", "ChangeMaintenanceStateFailed",
"At least one check plugin failed for profile %v: Will stay in %v state",
params.Profile.Current, params.State)
params.Log.Error(err, "Failed to check for state transition", "state", params.State,
Expand All @@ -123,12 +118,12 @@ func Apply(state NodeState, node *v1.Node, data *Data, params plugin.Parameters)
err = state.Trigger(params, data)
if err != nil {
params.Log.Error(err, "Failed to execute triggers", "state", params.State, "profile", params.Profile.Current)
recorder.SourcedEventf(node, source, "Normal", "ChangeMaintenanceStateFailed",
recorder.Eventf(node, "Normal", "ChangeMaintenanceStateFailed",
"At least one trigger plugin failed for profile %v: Will stay in %v state", params.Profile.Current, params.State)
return state.Label(), err
} else {
params.Log.Info("Moved node to next state", "state", string(next), "profile", params.Profile.Current)
recorder.SourcedEventf(node, source, "Normal", "ChangedMaintenanceState",
recorder.Eventf(node, "Normal", "ChangedMaintenanceState",
"The node is now in the %v state caused by profile %v", string(next), params.Profile.Current)
return next, nil
}
Expand Down

0 comments on commit 7b85ae1

Please sign in to comment.