Skip to content

Commit

Permalink
fix: race condition in instrumented application creation and deletion (
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir committed May 9, 2024
1 parent 605fcbf commit eb343a4
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package deleteinstrumentedapplication

import (
"context"
"fmt"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func getObjectByOwnerReference(ctx context.Context, k8sClient client.Client, ownerRef metav1.OwnerReference, namespace string) (client.Object, error) {

key := client.ObjectKey{
Name: ownerRef.Name,
Namespace: namespace,
}

if ownerRef.Kind == "Deployment" {
dep := &appsv1.Deployment{}
err := k8sClient.Get(ctx, key, dep)
return dep, err
}
if ownerRef.Kind == "DaemonSet" {
ds := &appsv1.DaemonSet{}
err := k8sClient.Get(ctx, key, ds)
return ds, err
}
if ownerRef.Kind == "StatefulSet" {
ss := &appsv1.StatefulSet{}
err := k8sClient.Get(ctx, key, ss)
return ss, err
}

return nil, fmt.Errorf("unsupported owner kind %s", ownerRef.Kind)
}

// DeploymentReconciler reconciles a Deployment object
type InstrumentedApplicationReconciler struct {
client.Client
Scheme *runtime.Scheme
}

func (r *InstrumentedApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

var instrumentedApplication odigosv1.InstrumentedApplication
err := r.Client.Get(ctx, req.NamespacedName, &instrumentedApplication)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// find the workload object which is the owner of the InstrumentedApplication
ownerReferences := instrumentedApplication.GetOwnerReferences()
if len(ownerReferences) != 1 {
logger.Info("InstrumentedApplication should have exactly one owner reference")
return ctrl.Result{}, nil
}
workloadObject, err := getObjectByOwnerReference(ctx, r.Client, ownerReferences[0], req.Namespace)
if err != nil {
logger.Error(err, "error fetching owner object")
return ctrl.Result{}, err
}

instEffectiveEnabled, err := isWorkloadInstrumentationEffectiveEnabled(ctx, r.Client, workloadObject)
if err != nil {
logger.Error(err, "error checking if instrumentation is effective")
return ctrl.Result{}, err
}

if !instEffectiveEnabled {
logger.Info("Deleting instrumented application for non-enabled workload")
err := r.Client.Delete(ctx, &instrumentedApplication)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

return ctrl.Result{}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package deleteinstrumentedapplication_test

import (
"context"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/instrumentor/internal/testutil"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

var _ = Describe("deleteInstrumentedApplication InstrumentedApplication controller", func() {
ctx := context.Background()
var namespace *corev1.Namespace
var deployment *appsv1.Deployment
var instrumentedApplication *odigosv1.InstrumentedApplication

Describe("Delete InstrumentedApplication", func() {

When("Object created after deployment reconciled", func() {

BeforeEach(func() {
namespace = testutil.NewMockNamespace()
Expect(k8sClient.Create(ctx, namespace)).Should(Succeed())

deployment = testutil.SetOdigosInstrumentationDisabled(testutil.NewMockTestDeployment(namespace))
Expect(k8sClient.Create(ctx, deployment)).Should(Succeed())
})

It("InstrumentedApplication created for deployment which is not enabled", func() {

instrumentedApplication = testutil.NewMockInstrumentedApplication(deployment)
Expect(k8sClient.Create(ctx, instrumentedApplication)).Should(Succeed())

testutil.AssertInstrumentedApplicationDeleted(ctx, k8sClient, instrumentedApplication)
})

})

})

})
12 changes: 12 additions & 0 deletions instrumentor/controllers/deleteinstrumentedapplication/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deleteinstrumentedapplication

import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -58,6 +59,17 @@ func SetupWithManager(mgr ctrl.Manager) error {
return err
}

err = builder.
ControllerManagedBy(mgr).
For(&odigosv1.InstrumentedApplication{}).
Complete(&InstrumentedApplicationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
})
if err != nil {
return err
}

return nil

}
8 changes: 8 additions & 0 deletions instrumentor/internal/testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ func NewMockInstrumentedApplication(workloadObject client.Object) *odigosv1.Inst
ObjectMeta: metav1.ObjectMeta{
Name: utils.GetRuntimeObjectName(workloadObject.GetName(), gvk.Kind),
Namespace: workloadObject.GetNamespace(),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: workloadObject.GetName(),
UID: workloadObject.GetUID(),
},
},
},
Spec: odigosv1.InstrumentedApplicationSpec{
RuntimeDetails: []odigosv1.RuntimeDetailsByContainer{
Expand Down
9 changes: 4 additions & 5 deletions odiglet/pkg/kube/runtime_details/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

func SetupWithManager(mgr ctrl.Manager) error {
Expand All @@ -16,6 +14,7 @@ func SetupWithManager(mgr ctrl.Manager) error {
ControllerManagedBy(mgr).
For(&appsv1.Deployment{}).
Owns(&odigosv1.InstrumentedApplication{}).
WithEventFilter(&WorkloadEnabledPredicate{}).
Complete(&DeploymentsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -28,6 +27,7 @@ func SetupWithManager(mgr ctrl.Manager) error {
ControllerManagedBy(mgr).
For(&appsv1.StatefulSet{}).
Owns(&odigosv1.InstrumentedApplication{}).
WithEventFilter(&WorkloadEnabledPredicate{}).
Complete(&StatefulSetsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -40,6 +40,7 @@ func SetupWithManager(mgr ctrl.Manager) error {
ControllerManagedBy(mgr).
For(&appsv1.DaemonSet{}).
Owns(&odigosv1.InstrumentedApplication{}).
WithEventFilter(&WorkloadEnabledPredicate{}).
Complete(&DaemonSetsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -51,9 +52,7 @@ func SetupWithManager(mgr ctrl.Manager) error {
err = builder.
ControllerManagedBy(mgr).
For(&corev1.Namespace{}).
WithEventFilter(predicate.NewPredicateFuncs(func(obj client.Object) bool {
return isObjectLabeled(obj)
})).
WithEventFilter(&WorkloadEnabledPredicate{}).
Owns(&odigosv1.InstrumentedApplication{}).
Complete(&NamespacesReconciler{
Client: mgr.GetClient(),
Expand Down
104 changes: 104 additions & 0 deletions odiglet/pkg/kube/runtime_details/workload_enabled_predicated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package runtime_details

import (
"github.com/odigos-io/odigos/common/consts"
appsv1 "k8s.io/api/apps/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// this predicate is used for workload reconciler, and will only pass events
// where the workload is changed to odigos instrumentation enabled.
// This way, we don't need to run language detection downstream when unnecessary.
// This also helps in managing race conditions, where we might re-add runtime details
// which were just deleted by instrumentor controller and generate unnecessary noise
// in the k8s eventual consistency model.
type WorkloadEnabledPredicate struct {
predicate.Funcs
}

func (i *WorkloadEnabledPredicate) Create(e event.CreateEvent) bool {
enabled := isInstrumentationEnabled(e.Object)
// only handle new workloads if they start with instrumentation enabled
return enabled
}

func (i *WorkloadEnabledPredicate) Update(e event.UpdateEvent) bool {

if e.ObjectOld == nil {
return false
}
if e.ObjectNew == nil {
return false
}

// only run runtime inspection if the workload was not instrumented before
// and now it is.
oldEnabled := isInstrumentationEnabled(e.ObjectOld)
newEnabled := isInstrumentationEnabled(e.ObjectNew)
becameEnabled := !oldEnabled && newEnabled

switch e.ObjectNew.GetObjectKind().GroupVersionKind().Kind {
case "Deployment":
oldDeployment, oldOk := e.ObjectOld.(*appsv1.Deployment)
newDeployment, newOk := e.ObjectNew.(*appsv1.Deployment)
if oldOk && newOk {
hadAvailableReplicas := isDeploymentAvailableReplicas(oldDeployment)
hasAvailableReplicas := isDeploymentAvailableReplicas(newDeployment)
replicasBecameAvailable := !hadAvailableReplicas && hasAvailableReplicas
return becameEnabled || replicasBecameAvailable
}
case "DaemonSet":
oldDaemonSet, oldOk := e.ObjectOld.(*appsv1.DaemonSet)
newDaemonSet, newOk := e.ObjectNew.(*appsv1.DaemonSet)
if oldOk && newOk {
hadAvailableReplicas := isDaemonsetAvailableReplicas(oldDaemonSet)
hasAvailableReplicas := isDaemonsetAvailableReplicas(newDaemonSet)
replicasBecameAvailable := !hadAvailableReplicas && hasAvailableReplicas
return becameEnabled || replicasBecameAvailable
}
case "StatefulSet":
oldStatefulSet, oldOk := e.ObjectOld.(*appsv1.StatefulSet)
newStatefulSet, newOk := e.ObjectNew.(*appsv1.StatefulSet)
if oldOk && newOk {
hadAvailableReplicas := isStatefulsetAvailableReplicas(oldStatefulSet)
hasAvailableReplicas := isStatefulsetAvailableReplicas(newStatefulSet)
replicasBecameAvailable := !hadAvailableReplicas && hasAvailableReplicas
return becameEnabled || replicasBecameAvailable
}
}

// for namespace events or if there was issue with type casting
return becameEnabled
}

func (i *WorkloadEnabledPredicate) Delete(e event.DeleteEvent) bool {
// no need to calculate runtime details for deleted workloads
return false
}

func (i *WorkloadEnabledPredicate) Generic(e event.GenericEvent) bool {
// not sure when exactly this would be called, but we don't need to handle it
return false
}

func isInstrumentationEnabled(obj client.Object) bool {
labels := obj.GetLabels()
if labels == nil {
return false
}
return labels[consts.OdigosInstrumentationLabel] == consts.InstrumentationEnabled
}

func isDeploymentAvailableReplicas(dep *appsv1.Deployment) bool {
return dep.Status.AvailableReplicas > 0
}

func isDaemonsetAvailableReplicas(dep *appsv1.DaemonSet) bool {
return dep.Status.NumberReady > 0
}

func isStatefulsetAvailableReplicas(dep *appsv1.StatefulSet) bool {
return dep.Status.ReadyReplicas > 0
}

0 comments on commit eb343a4

Please sign in to comment.