Skip to content

Commit

Permalink
feat: emit k8s events for controller messages. Fixes #856 (#901)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>
  • Loading branch information
dpadhiar committed Aug 1, 2023
1 parent 3f16ccd commit 872aa86
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 50 deletions.
4 changes: 2 additions & 2 deletions pkg/reconciler/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func Start(namespaced bool, managedNamespace string) {

// Pipeline controller
pipelineController, err := controller.New(dfv1.ControllerPipeline, mgr, controller.Options{
Reconciler: plctrl.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, image, logger),
Reconciler: plctrl.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, image, logger, mgr.GetEventRecorderFor(dfv1.ControllerPipeline)),
})
if err != nil {
logger.Fatalw("Unable to set up Pipeline controller", zap.Error(err))
Expand Down Expand Up @@ -167,7 +167,7 @@ func Start(namespaced bool, managedNamespace string) {
// Vertex controller
autoscaler := scaling.NewScaler(mgr.GetClient(), scaling.WithWorkers(20))
vertexController, err := controller.New(dfv1.ControllerVertex, mgr, controller.Options{
Reconciler: vertexctrl.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, image, autoscaler, logger),
Reconciler: vertexctrl.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, image, autoscaler, logger, mgr.GetEventRecorderFor(dfv1.ControllerVertex)),
})
if err != nil {
logger.Fatalw("Unable to set up Vertex controller", zap.Error(err))
Expand Down
38 changes: 28 additions & 10 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -55,13 +56,14 @@ type pipelineReconciler struct {
client client.Client
scheme *runtime.Scheme

config *reconciler.GlobalConfig
image string
logger *zap.SugaredLogger
config *reconciler.GlobalConfig
image string
logger *zap.SugaredLogger
recorder record.EventRecorder
}

func NewReconciler(client client.Client, scheme *runtime.Scheme, config *reconciler.GlobalConfig, image string, logger *zap.SugaredLogger) reconcile.Reconciler {
return &pipelineReconciler{client: client, scheme: scheme, config: config, image: image, logger: logger}
func NewReconciler(client client.Client, scheme *runtime.Scheme, config *reconciler.GlobalConfig, image string, logger *zap.SugaredLogger, recorder record.EventRecorder) reconcile.Reconciler {
return &pipelineReconciler{client: client, scheme: scheme, config: config, image: image, logger: logger, recorder: recorder}
}

func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -102,7 +104,9 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
if time.Now().Before(pl.DeletionTimestamp.Add(time.Duration(pl.Spec.Lifecycle.GetDeleteGracePeriodSeconds()) * time.Second)) {
safeToDelete, err := r.safeToDelete(ctx, pl)
if err != nil {
log.Errorw("Failed to check if it's safe to delete the pipeline", zap.Error(err))
logMsg := fmt.Sprintf("Failed to check if it's safe to delete the pipeline: %v", err.Error())
log.Error(logMsg)
r.recorder.Event(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", logMsg)
return ctrl.Result{}, err
}

Expand All @@ -114,7 +118,9 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}
// Finalizer logic should be added here.
if err := r.cleanUpBuffers(ctx, pl, log); err != nil {
log.Errorw("Failed to create buffer clean up job", zap.Error(err))
logMsg := fmt.Sprintf("Failed to create buffer clean up job: %v", err.Error())
log.Error(logMsg)
r.recorder.Event(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", logMsg)
return ctrl.Result{}, err

}
Expand All @@ -125,17 +131,24 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (

// New, or reconciliation failed pipeline
if pl.Status.Phase == dfv1.PipelinePhaseUnknown || pl.Status.Phase == dfv1.PipelinePhaseFailed {
return r.reconcileNonLifecycleChanges(ctx, pl)
result, err := r.reconcileNonLifecycleChanges(ctx, pl)
if err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", "Failed to reconcile pipeline: %v", err.Error())
}
return result, err
}

if oldPhase := pl.Status.Phase; oldPhase != pl.Spec.Lifecycle.GetDesiredPhase() {
requeue, err := r.updateDesiredState(ctx, pl)
if err != nil {
log.Errorw("Updated desired pipeline phase failed", zap.Error(err))
logMsg := fmt.Sprintf("Updated desired pipeline phase failed: %v", zap.Error(err))
log.Error(logMsg)
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", logMsg)
return ctrl.Result{}, err
}
if pl.Status.Phase != oldPhase {
log.Infow("Updated pipeline phase", zap.String("originalPhase", string(oldPhase)), zap.String("currentPhase", string(pl.Status.Phase)))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdatePipelinePhase", "Updated pipeline phase from %s to %s", string(oldPhase), string(pl.Status.Phase))
}
if requeue {
return ctrl.Result{RequeueAfter: dfv1.DefaultRequeueAfter}, nil
Expand All @@ -144,7 +157,11 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}

// Regular pipeline update
return r.reconcileNonLifecycleChanges(ctx, pl)
result, err := r.reconcileNonLifecycleChanges(ctx, pl)
if err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "ReconcilePipelineFailed", "Failed to reconcile pipeline: %v", err.Error())
}
return result, err
}

// reconcileNonLifecycleChanges do the jobs not related to pipeline lifecycle changes.
Expand Down Expand Up @@ -777,6 +794,7 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
return false, err
}
log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", replicas), zap.String("vertex", vertex.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, replicas)
isVertexPatched = true
}
}
Expand Down
79 changes: 73 additions & 6 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -108,7 +109,7 @@ func init() {

func Test_NewReconciler(t *testing.T) {
cl := fake.NewClientBuilder().Build()
r := NewReconciler(cl, scheme.Scheme, fakeConfig, testFlowImage, zaptest.NewLogger(t).Sugar())
r := NewReconciler(cl, scheme.Scheme, fakeConfig, testFlowImage, zaptest.NewLogger(t).Sugar(), record.NewFakeRecorder(64))
_, ok := r.(*pipelineReconciler)
assert.True(t, ok)
}
Expand All @@ -123,11 +124,12 @@ func Test_reconcile(t *testing.T) {
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testPipeline.DeepCopy()
_, err = r.reconcile(ctx, testObj)
Expand All @@ -144,6 +146,62 @@ func Test_reconcile(t *testing.T) {
})
}

func Test_reconcileEvents(t *testing.T) {
t.Run("test reconcile - invalid name", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testPipeline.DeepCopy()
testObj.Status.Phase = "Paused"
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
testObj.Name = "very-very-very-loooooooooooooooooooooooooooooooooooong"
_, err = r.reconcile(ctx, testObj)
assert.Error(t, err)
events := getEvents(r, 2)
assert.Equal(t, "Normal UpdatePipelinePhase Updated pipeline phase from Paused to Running", events[0])
assert.Equal(t, "Warning ReconcilePipelineFailed Failed to reconcile pipeline: the length of the pipeline name plus the vertex name is over the max limit. (very-very-very-loooooooooooooooooooooooooooooooooooong-input), [must be no more than 63 characters]", events[1])
})

t.Run("test reconcile - duplicate vertex", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testPipeline.DeepCopy()
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
testObj.Spec.Vertices = append(testObj.Spec.Vertices, dfv1.AbstractVertex{Name: "input", Source: &dfv1.Source{}})
_, err = r.reconcile(ctx, testObj)
assert.Error(t, err)
events := getEvents(r, 1)
assert.Equal(t, "Warning ReconcilePipelineFailed Failed to reconcile pipeline: duplicate vertex name \"input\"", events[0])
})
}

func Test_buildVertices(t *testing.T) {
r := buildVertices(testPipeline)
assert.Equal(t, 3, len(r))
Expand Down Expand Up @@ -516,3 +574,12 @@ func Test_createOrUpdateSIMDeployments(t *testing.T) {
assert.Equal(t, testObj.GetSideInputsManagerDeploymentName("s2"), deployList.Items[0].Name)
})
}

func getEvents(reconciler *pipelineReconciler, num int) []string {
c := reconciler.recorder.(*record.FakeRecorder).Events
events := make([]string, num)
for i := 0; i < num; i++ {
events[i] = <-c
}
return events
}
57 changes: 26 additions & 31 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -51,11 +52,12 @@ type vertexReconciler struct {
image string
logger *zap.SugaredLogger

scaler *scaling.Scaler
scaler *scaling.Scaler
recorder record.EventRecorder
}

func NewReconciler(client client.Client, scheme *runtime.Scheme, config *reconciler.GlobalConfig, image string, scaler *scaling.Scaler, logger *zap.SugaredLogger) reconcile.Reconciler {
return &vertexReconciler{client: client, scheme: scheme, config: config, image: image, scaler: scaler, logger: logger}
func NewReconciler(client client.Client, scheme *runtime.Scheme, config *reconciler.GlobalConfig, image string, scaler *scaling.Scaler, logger *zap.SugaredLogger, recorder record.EventRecorder) reconcile.Reconciler {
return &vertexReconciler{client: client, scheme: scheme, config: config, image: image, scaler: scaler, logger: logger, recorder: recorder}
}

func (r *vertexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -103,15 +105,14 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
if apierrors.IsNotFound(err) {
e := fmt.Errorf("isbsvc %s not found", isbSvcName)
vertex.Status.MarkPhaseFailed("ISBSvcNotFound", e.Error())
r.recorder.Event(vertex, corev1.EventTypeWarning, "ISBSvcNotFound", e.Error())
return ctrl.Result{}, e
}
log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
vertex.Status.MarkPhaseFailed("FindISBSvcFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "FindISBSvcFailed", err.Error(), "Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, err
}
if !isbSvc.Status.IsReady() {
log.Errorw("ISB Service is not in ready status", zap.String("isbsvc", isbSvcName), zap.Error(err))
vertex.Status.MarkPhaseFailed("ISBSvcNotReady", "isbsvc not ready")
r.markPhaseLogEvent(vertex, log, "ISBSvcNotReady", err.Error(), "isbsvc not ready", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, fmt.Errorf("isbsvc not ready")
}

Expand All @@ -126,22 +127,19 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
for i := 0; i < desiredReplicas; i++ {
newPvc, err := r.buildReduceVertexPVCSpec(vertex, i)
if err != nil {
log.Errorw("Error building a PVC spec", zap.Error(err))
vertex.Status.MarkPhaseFailed("BuildPVCSpecFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "BuildPVCSpecFailed", err.Error(), "Error building a PVC spec", zap.Error(err))
return ctrl.Result{}, err
}
hash := sharedutil.MustHash(newPvc.Spec)
newPvc.SetAnnotations(map[string]string{dfv1.KeyHash: hash})
existingPvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: vertex.Namespace, Name: newPvc.Name}, existingPvc); err != nil {
if !apierrors.IsNotFound(err) {
log.Errorw("Error finding existing PVC", zap.Error(err))
vertex.Status.MarkPhaseFailed("FindExistingPVCFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "FindExistingPVCFailed", err.Error(), "Error finding existing PVC", zap.Error(err))
return ctrl.Result{}, err
}
if err := r.client.Create(ctx, newPvc); err != nil && !apierrors.IsAlreadyExists(err) {
log.Errorw("Error creating a PVC", zap.Error(err))
vertex.Status.MarkPhaseFailed("CreatePVCFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "CreatePVCFailed", err.Error(), "Error creating a PVC", zap.Error(err))
return ctrl.Result{}, err
}
} else {
Expand All @@ -158,22 +156,19 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (

pipeline := &dfv1.Pipeline{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: vertex.Namespace, Name: vertex.Spec.PipelineName}, pipeline); err != nil {
log.Errorw("Failed to get pipeline object", zap.Error(err))
vertex.Status.MarkPhaseFailed("GetPipelineFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "GetPipelineFailed", err.Error(), "Failed to get pipeline object", zap.Error(err))
return ctrl.Result{}, err
}

existingPods, err := r.findExistingPods(ctx, vertex)
if err != nil {
log.Errorw("Failed to find existing pods", zap.Error(err))
vertex.Status.MarkPhaseFailed("FindExistingPodFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "FindExistingPodFailed", err.Error(), "Failed to find existing pods", zap.Error(err))
return ctrl.Result{}, err
}
for replica := 0; replica < desiredReplicas; replica++ {
podSpec, err := r.buildPodSpec(vertex, pipeline, isbSvc.Status.Config, replica)
if err != nil {
log.Errorw("Failed to generate pod spec", zap.Error(err))
vertex.Status.MarkPhaseFailed("PodSpecGenFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "PodSpecGenFailed", err.Error(), "Failed to generate pod spec", zap.Error(err))
return ctrl.Result{}, err
}
hash := sharedutil.MustHash(podSpec)
Expand Down Expand Up @@ -226,17 +221,15 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
}
pod.Spec.Hostname = fmt.Sprintf("%s-%d", vertex.Name, replica)
if err := r.client.Create(ctx, pod); err != nil {
log.Errorw("Failed to create pod", zap.String("pod", pod.Name), zap.Error(err))
vertex.Status.MarkPhaseFailed("CreatePodFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "CreatePodFailed", err.Error(), "Failed to created pod", zap.Error(err))
return ctrl.Result{}, err
}
log.Infow("Succeeded to create a pod", zap.String("pod", pod.Name))
}
}
for _, v := range existingPods {
if err := r.client.Delete(ctx, &v); err != nil && !apierrors.IsNotFound(err) {
log.Errorw("Failed to delete pod", zap.String("pod", v.Name), zap.Error(err))
vertex.Status.MarkPhaseFailed("DelPodFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "DelPodFailed", err.Error(), "Failed to delete pod", zap.Error(err))
return ctrl.Result{}, err
}
}
Expand All @@ -253,8 +246,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
// create services
existingSvcs, err := r.findExistingServices(ctx, vertex)
if err != nil {
log.Errorw("Failed to find existing services", zap.Error(err))
vertex.Status.MarkPhaseFailed("FindExistingSvcsFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "FindExistingSvcsFailed", err.Error(), "Failed to find existing services", zap.Error(err))
return ctrl.Result{}, err
}
for _, s := range vertex.GetServiceObjs() {
Expand All @@ -265,8 +257,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
if existingSvc.GetAnnotations()[dfv1.KeyHash] != svcHash {
if err := r.client.Delete(ctx, &existingSvc); err != nil {
if !apierrors.IsNotFound(err) {
log.Errorw("Failed to delete existing service", zap.String("service", existingSvc.Name), zap.Error(err))
vertex.Status.MarkPhaseFailed("DelSvcFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "DelSvcFailed", err.Error(), "Failed to delete existing service", zap.String("service", existingSvc.Name), zap.Error(err))
return ctrl.Result{}, err
}
} else {
Expand All @@ -283,8 +274,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
if apierrors.IsAlreadyExists(err) {
continue
}
log.Errorw("Failed to create a service", zap.String("service", s.Name), zap.Error(err))
vertex.Status.MarkPhaseFailed("CreateSvcFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "CreateSvcFailed", err.Error(), "Failed to create a service", zap.String("service", s.Name), zap.Error(err))
return ctrl.Result{}, err
} else {
log.Infow("Succeeded to create a service", zap.String("service", s.Name))
Expand All @@ -294,8 +284,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
for _, v := range existingSvcs { // clean up stale services
if err := r.client.Delete(ctx, &v); err != nil {
if !apierrors.IsNotFound(err) {
log.Errorw("Failed to delete service not in use", zap.String("service", v.Name), zap.Error(err))
vertex.Status.MarkPhaseFailed("DelSvcFailed", err.Error())
r.markPhaseLogEvent(vertex, log, "DelSvcFailed", err.Error(), "Failed to delete service not in use", zap.String("service", v.Name), zap.Error(err))
return ctrl.Result{}, err
}
} else {
Expand Down Expand Up @@ -409,3 +398,9 @@ func (r *vertexReconciler) findExistingServices(ctx context.Context, vertex *dfv
}
return result, nil
}

func (r *vertexReconciler) markPhaseLogEvent(vertex *dfv1.Vertex, log *zap.SugaredLogger, reason, message, logMsg string, logWith ...interface{}) {
log.Errorw(logMsg, logWith)
vertex.Status.MarkPhaseFailed(reason, message)
r.recorder.Event(vertex, corev1.EventTypeWarning, reason, message)
}
Loading

0 comments on commit 872aa86

Please sign in to comment.