Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <p.loffay@gmail.com>
  • Loading branch information
pavolloffay committed May 8, 2024
1 parent 76a0ab5 commit 1dc006e
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 37 deletions.
16 changes: 11 additions & 5 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,18 @@ func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logg
if len(errs) > 0 {
return fmt.Errorf("failed to create objects for %s: %w", owner.GetName(), errors.Join(errs...))
}
// Pruning owned objects in the cluster which are not should not be present after the reconciliation.
err := deleteObjects(ctx, kubeClient, logger, ownedObjects)
if err != nil {
return fmt.Errorf("failed to prune objects for %s: %w", owner.GetName(), err)
}
return nil
}

func deleteObjects(ctx context.Context, kubeClient client.Client, logger logr.Logger, objects map[types.UID]client.Object) error {
// Pruning owned objects in the cluster which are not should not be present after the reconciliation.
pruneErrs := []error{}
for _, obj := range ownedObjects {
for _, obj := range objects {
l := logger.WithValues(
"object_name", obj.GetName(),
"object_kind", obj.GetObjectKind().GroupVersionKind(),
Expand All @@ -137,8 +146,5 @@ func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logg
pruneErrs = append(pruneErrs, err)
}
}
if len(pruneErrs) > 0 {
return fmt.Errorf("failed to prune objects for %s: %w", owner.GetName(), errors.Join(pruneErrs...))
}
return nil
return errors.Join(pruneErrs...)
}
86 changes: 82 additions & 4 deletions controllers/opentelemetrycollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/autodetect/openshift"
Expand Down Expand Up @@ -127,7 +128,42 @@ func (r *OpenTelemetryCollectorReconciler) findOtelOwnedObjects(ctx context.Cont
for i := range pdbList.Items {
ownedObjects[pdbList.Items[i].GetUID()] = &pdbList.Items[i]
}
if params.Config.CreateRBACPermissions() == rbac.Available {
clusterObjects, err := r.findClusterRoleObjects(ctx, params)
if err != nil {
return nil, err
}
for k, v := range clusterObjects {
ownedObjects[k] = v
}
}
return ownedObjects, nil
}

// The cluster scope objects do not have owner reference.
func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Context, params manifests.Params) (map[types.UID]client.Object, error) {
ownedObjects := map[types.UID]client.Object{}
// Remove cluster roles and bindings.
// Users might switch off the RBAC creation feature on the operator which should remove existing RBAC.
listOpsCluster := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)),
}
clusterroleList := &rbacv1.ClusterRoleList{}
err := r.List(ctx, clusterroleList, listOpsCluster)
if err != nil {
return nil, fmt.Errorf("error listing ClusterRoles: %w", err)
}
for i := range clusterroleList.Items {
ownedObjects[clusterroleList.Items[i].GetUID()] = &clusterroleList.Items[i]
}
clusterrolebindingList := &rbacv1.ClusterRoleBindingList{}
err = r.List(ctx, clusterrolebindingList, listOpsCluster)
if err != nil {
return nil, fmt.Errorf("error listing ClusterRoleBIndings: %w", err)
}
for i := range clusterrolebindingList.Items {
ownedObjects[clusterrolebindingList.Items[i].GetUID()] = &clusterrolebindingList.Items[i]
}
return ownedObjects, nil
}

Expand Down Expand Up @@ -193,8 +229,32 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

params, err := r.getParams(instance)
if err != nil {
log.Error(err, "Failed to create manifest.Params")
return ctrl.Result{}, err
}

// We have a deletion, short circuit and let the deletion happen
if deletionTimestamp := instance.GetDeletionTimestamp(); deletionTimestamp != nil {
if controllerutil.ContainsFinalizer(&instance, collectorFinalizer) {
// If the finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.finalizeCollector(ctx, params); err != nil {

Check failure on line 244 in controllers/opentelemetrycollector_controller.go

View workflow job for this annotation

GitHub Actions / Code standards (linting)

shadow: declaration of "err" shadows declaration at line 233 (govet)
return ctrl.Result{}, err
}

// Once all finalizers have been
// removed, the object will be deleted.
if controllerutil.RemoveFinalizer(&instance, collectorFinalizer) {
err := r.Update(ctx, &instance)

Check failure on line 251 in controllers/opentelemetrycollector_controller.go

View workflow job for this annotation

GitHub Actions / Code standards (linting)

shadow: declaration of "err" shadows declaration at line 233 (govet)
if err != nil {
return ctrl.Result{}, err
}
}
}

return ctrl.Result{}, nil
}

Expand All @@ -204,10 +264,14 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct
return ctrl.Result{}, nil
}

params, err := r.getParams(instance)
if err != nil {
log.Error(err, "Failed to create manifest.Params")
return ctrl.Result{}, err
// Add finalizer for this CR
if !controllerutil.ContainsFinalizer(&instance, collectorFinalizer) {
if controllerutil.AddFinalizer(&instance, collectorFinalizer) {
err = r.Update(ctx, &instance)
if err != nil {
return ctrl.Result{}, err
}
}
}

desiredObjects, buildErr := BuildCollector(params)
Expand Down Expand Up @@ -255,3 +319,17 @@ func (r *OpenTelemetryCollectorReconciler) SetupWithManager(mgr ctrl.Manager) er

return builder.Complete(r)
}

const collectorFinalizer = "opentelemetrycollector.opentelemetry.io/finalizer"

func (r *OpenTelemetryCollectorReconciler) finalizeCollector(ctx context.Context, params manifests.Params) error {
// The cluster scope objects do not have owner reference. They need to be deleted explicitly
if params.Config.CreateRBACPermissions() == rbac.Available {
objects, err := r.findClusterRoleObjects(ctx, params)
if err != nil {
return err
}
return deleteObjects(ctx, r.Client, r.log, objects)
}
return nil
}
40 changes: 20 additions & 20 deletions controllers/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ var (
type check[T any] func(t *testing.T, params T)

func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
addedMetadataDeployment := testCollectorWithMode(v1alpha1.ModeDeployment)
addedMetadataDeployment := testCollectorWithMode("test-deployment", v1alpha1.ModeDeployment)
addedMetadataDeployment.Labels = map[string]string{
labelName: labelVal,
}
addedMetadataDeployment.Annotations = map[string]string{
annotationName: annotationVal,
}
deploymentExtraPorts := testCollectorWithModeAndReplicas(v1alpha1.ModeDeployment, 3)
deploymentExtraPorts := testCollectorWithModeAndReplicas("test-deployment", v1alpha1.ModeDeployment, 3)
deploymentExtraPorts.Spec.Ports = append(deploymentExtraPorts.Spec.Ports, extraPorts)
deploymentExtraPorts.Spec.DeploymentUpdateStrategy = appsv1.DeploymentStrategy{
RollingUpdate: &appsv1.RollingUpdateDeployment{
Expand All @@ -94,20 +94,20 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
},
},
}
ingressParams := testCollectorAssertNoErr(t, "", testFileIngress)
ingressParams := testCollectorAssertNoErr(t, "test-ingress", "", testFileIngress)
ingressParams.Spec.Ingress.Type = "ingress"
updatedIngressParams := testCollectorAssertNoErr(t, "", testFileIngress)
updatedIngressParams := testCollectorAssertNoErr(t, "test-ingress", "", testFileIngress)
updatedIngressParams.Spec.Ingress.Type = "ingress"
updatedIngressParams.Spec.Ingress.Annotations = map[string]string{"blub": "blob"}
updatedIngressParams.Spec.Ingress.Hostname = expectHostname
routeParams := testCollectorAssertNoErr(t, "", testFileIngress)
routeParams := testCollectorAssertNoErr(t, "test-route", "", testFileIngress)
routeParams.Spec.Ingress.Type = v1alpha1.IngressTypeRoute
routeParams.Spec.Ingress.Route.Termination = v1alpha1.TLSRouteTerminationTypeInsecure
updatedRouteParams := testCollectorAssertNoErr(t, "", testFileIngress)
updatedRouteParams := testCollectorAssertNoErr(t, "test-route", "", testFileIngress)
updatedRouteParams.Spec.Ingress.Type = v1alpha1.IngressTypeRoute
updatedRouteParams.Spec.Ingress.Route.Termination = v1alpha1.TLSRouteTerminationTypeInsecure
updatedRouteParams.Spec.Ingress.Hostname = expectHostname
deletedParams := testCollectorWithMode(v1alpha1.ModeDeployment)
deletedParams := testCollectorWithMode("test2", v1alpha1.ModeDeployment)
now := metav1.NewTime(time.Now())
deletedParams.DeletionTimestamp = &now

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
assert.True(t, exists)
assert.Equal(t, svc.Spec.Selector, map[string]string{
"app.kubernetes.io/component": "opentelemetry-collector",
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/instance": "default.test-deployment",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/part-of": "opentelemetry",
})
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
assert.Contains(t, actual.Spec.Ports, extraPorts.ServicePort)
assert.Equal(t, actual.Spec.Selector, map[string]string{
"app.kubernetes.io/component": "opentelemetry-collector",
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/instance": "default.test-deployment",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/part-of": "opentelemetry",
})
Expand All @@ -206,7 +206,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
{
name: "invalid mode",
args: args{
params: testCollectorWithMode("bad"),
params: testCollectorWithMode("test-invalid", "bad"),
updates: []v1alpha1.OpenTelemetryCollector{},
},
want: []want{
Expand All @@ -223,7 +223,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
{
name: "invalid prometheus configuration",
args: args{
params: testCollectorAssertNoErr(t, baseTaImage, testFileIngress),
params: testCollectorAssertNoErr(t, "test-invalid-prom", baseTaImage, testFileIngress),
updates: []v1alpha1.OpenTelemetryCollector{},
},
want: []want{
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
checks: []check[v1alpha1.OpenTelemetryCollector]{
func(t *testing.T, params v1alpha1.OpenTelemetryCollector) {
got := routev1.Route{}
nsn := types.NamespacedName{Namespace: params.Namespace, Name: "otlp-grpc-test-route"}
nsn := types.NamespacedName{Namespace: params.Namespace, Name: "otlp-grpc-test-route-route"}
exists, err := populateObjectIfExists(t, &got, nsn)
assert.NoError(t, err)
assert.True(t, exists)
Expand All @@ -299,7 +299,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
checks: []check[v1alpha1.OpenTelemetryCollector]{
func(t *testing.T, params v1alpha1.OpenTelemetryCollector) {
got := routev1.Route{}
nsn := types.NamespacedName{Namespace: params.Namespace, Name: "otlp-grpc-test-route"}
nsn := types.NamespacedName{Namespace: params.Namespace, Name: "otlp-grpc-test-route-route"}
exists, err := populateObjectIfExists(t, &got, nsn)
assert.NoError(t, err)
assert.True(t, exists)
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
{
name: "daemonset collector",
args: args{
params: testCollectorWithMode(v1alpha1.ModeDaemonSet),
params: testCollectorWithMode("test-daemonset", v1alpha1.ModeDaemonSet),
},
want: []want{
{
Expand All @@ -416,11 +416,11 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
{
name: "stateful should update collector with TA",
args: args{
params: testCollectorWithMode(v1alpha1.ModeStatefulSet),
params: testCollectorWithMode("test-stateful-ta", v1alpha1.ModeStatefulSet),
updates: []v1alpha1.OpenTelemetryCollector{
testCollectorAssertNoErr(t, baseTaImage, promFile),
testCollectorAssertNoErr(t, baseTaImage, updatedPromFile),
testCollectorAssertNoErr(t, updatedTaImage, updatedPromFile),
testCollectorAssertNoErr(t, "test-stateful-ta", baseTaImage, promFile),
testCollectorAssertNoErr(t, "test-stateful-ta", baseTaImage, updatedPromFile),
testCollectorAssertNoErr(t, "test-stateful-ta", updatedTaImage, updatedPromFile),
},
},
want: []want{
Expand Down Expand Up @@ -463,13 +463,13 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
exists, err = populateObjectIfExists(t, &v1.ServiceAccount{}, namespacedObjectName(naming.TargetAllocatorServiceAccount(params.Name), params.Namespace))
assert.NoError(t, err)
assert.True(t, exists)
promConfig, err := ta.ConfigToPromConfig(testCollectorAssertNoErr(t, baseTaImage, promFile).Spec.Config)
promConfig, err := ta.ConfigToPromConfig(testCollectorAssertNoErr(t, "test-stateful-ta", baseTaImage, promFile).Spec.Config)
assert.NoError(t, err)

taConfig := make(map[interface{}]interface{})
taConfig["collector_selector"] = metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/instance": "default.test-stateful-ta",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/component": "opentelemetry-collector",
"app.kubernetes.io/part-of": "opentelemetry",
Expand Down
16 changes: 8 additions & 8 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func testCollectorWithMode(mode v1alpha1.Mode) v1alpha1.OpenTelemetryCollector {
func testCollectorWithMode(name string, mode v1alpha1.Mode) v1alpha1.OpenTelemetryCollector {
replicas := int32(2)
return testCollectorWithModeAndReplicas(mode, replicas)
return testCollectorWithModeAndReplicas(name, mode, replicas)
}

func testCollectorWithModeAndReplicas(mode v1alpha1.Mode, replicas int32) v1alpha1.OpenTelemetryCollector {
func testCollectorWithModeAndReplicas(name string, mode v1alpha1.Mode, replicas int32) v1alpha1.OpenTelemetryCollector {
configYAML, err := os.ReadFile("testdata/test.yaml")
if err != nil {
fmt.Printf("Error getting yaml file: %v", err)
Expand All @@ -251,7 +251,7 @@ func testCollectorWithModeAndReplicas(mode v1alpha1.Mode, replicas int32) v1alph
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Name: name,
Namespace: "default",
},
Spec: v1alpha1.OpenTelemetryCollectorSpec{
Expand All @@ -273,16 +273,16 @@ func testCollectorWithModeAndReplicas(mode v1alpha1.Mode, replicas int32) v1alph
}
}

func testCollectorAssertNoErr(t *testing.T, taContainerImage string, file string) v1alpha1.OpenTelemetryCollector {
p, err := testCollectorWithConfigFile(taContainerImage, file)
func testCollectorAssertNoErr(t *testing.T, name string, taContainerImage string, file string) v1alpha1.OpenTelemetryCollector {
p, err := testCollectorWithConfigFile(name, taContainerImage, file)
assert.NoError(t, err)
if len(taContainerImage) == 0 {
p.Spec.TargetAllocator.Enabled = false
}
return p
}

func testCollectorWithConfigFile(taContainerImage string, file string) (v1alpha1.OpenTelemetryCollector, error) {
func testCollectorWithConfigFile(name string, taContainerImage string, file string) (v1alpha1.OpenTelemetryCollector, error) {
replicas := int32(1)
var configYAML []byte
var err error
Expand All @@ -301,7 +301,7 @@ func testCollectorWithConfigFile(taContainerImage string, file string) (v1alpha1
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Name: name,
Namespace: "default",
},
Spec: v1alpha1.OpenTelemetryCollectorSpec{
Expand Down

0 comments on commit 1dc006e

Please sign in to comment.