Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOG-4612: Service/secret is not deleted when the inputs.receiver.http is removed #2216

Merged
merged 1 commit into from Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion internal/collector/daemonset.go
Expand Up @@ -16,10 +16,18 @@ import (
)

// ReconcileDaemonset reconciles a daemonset specifically for the collector defined by the factory
func (f *Factory) ReconcileDaemonset(er record.EventRecorder, k8sClient client.Client, namespace string, owner metav1.OwnerReference, httpInputs []string) error {
func (f *Factory) ReconcileDaemonset(er record.EventRecorder, k8sClient client.Client, namespace string, owner metav1.OwnerReference) error {
trustedCABundle, trustHash := GetTrustedCABundle(k8sClient, namespace, f.ResourceNames.CaTrustBundle)
f.TrustedCAHash = trustHash
tlsProfile, _ := tls.FetchAPIServerTlsProfile(k8sClient)

var httpInputs []string
for _, input := range f.ForwarderSpec.Inputs {
if input.Receiver != nil && input.Receiver.HTTP != nil {
httpInputs = append(httpInputs, input.Name)
}
}

desired := f.NewDaemonSet(namespace, f.ResourceNames.DaemonSetName(), trustedCABundle, tls.GetClusterTLSProfileSpec(tlsProfile), httpInputs)
utils.AddOwnerRefToObject(desired, owner)
return reconcile.DaemonSet(er, k8sClient, desired)
Expand Down
6 changes: 4 additions & 2 deletions internal/constants/constants.go
Expand Up @@ -123,7 +123,8 @@ const (
ClusterLogging = "cluster-logging"
ClusterLoggingOperator = "cluster-logging-operator"
// Commonly-used label names.
LabelApp = "app"
LabelApp = "app"
LabelComponent = "component"

EventReasonReconcilingLoggingCR = "ReconcilingLoggingCR"
EventReasonCreateObject = "CreateObject"
Expand All @@ -137,7 +138,8 @@ const (
OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
OTELSchema = "opentelemetry"

HTTPReceiverPort = 8443
HTTPReceiverPort = 8443
LabelHTTPInputService = "http-input-service"
)

var ReconcileForGlobalProxyList = []string{CollectorTrustedCAName}
Expand Down
32 changes: 21 additions & 11 deletions internal/k8shandler/collection.go
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/openshift/cluster-logging-operator/internal/auth"
"github.com/openshift/cluster-logging-operator/internal/utils/sets"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -109,14 +110,7 @@ func (clusterRequest *ClusterLoggingRequest) CreateOrUpdateCollection() (err err
return err
}

var httpInputs []string
for _, input := range clusterRequest.Forwarder.Spec.Inputs {
if input.Receiver != nil && input.Receiver.HTTP != nil {
httpInputs = append(httpInputs, input.Name)
}
}

if err := factory.ReconcileDaemonset(clusterRequest.EventRecorder, clusterRequest.Client, clusterRequest.Forwarder.Namespace, clusterRequest.ResourceOwner, httpInputs); err != nil {
if err := factory.ReconcileDaemonset(clusterRequest.EventRecorder, clusterRequest.Client, clusterRequest.Forwarder.Namespace, clusterRequest.ResourceOwner); err != nil {
log.Error(err, "collector.ReconcileDaemonset")
return err
}
Expand All @@ -140,7 +134,7 @@ func (clusterRequest *ClusterLoggingRequest) removeCollector() (err error) {
return err
}

if err = clusterRequest.RemoveInputServices(); err != nil {
if err = clusterRequest.RemoveInputServices([]metav1.OwnerReference{utils.AsOwner(clusterRequest.Forwarder)}, true); err != nil {
return
}

Expand Down Expand Up @@ -199,17 +193,33 @@ func (clusterRequest *ClusterLoggingRequest) addSecurityLabelsToNamespace() erro
return nil
}

func (clusterRequest *ClusterLoggingRequest) RemoveInputServices() error {
func (clusterRequest *ClusterLoggingRequest) RemoveInputServices(currOwner []metav1.OwnerReference, removeAllServices bool) error {
if clusterRequest.Cluster.Spec.Collection.Type != logging.LogCollectionTypeVector {
return nil
}

// Get list of HTTP input services by label/ namespace
httpServices, err := clusterRequest.GetServiceList(constants.LabelComponent, constants.LabelHTTPInputService, clusterRequest.Forwarder.Namespace)
if err != nil {
return err
}

// Collect defined http inputs
httpInputs := sets.NewString()
for _, input := range clusterRequest.Forwarder.Spec.Inputs {
if input.Receiver != nil && input.Receiver.HTTP != nil {
if err := clusterRequest.RemoveInputService(input.Name); err != nil {
httpInputs.Insert(input.Name)
}
}

// Remove services only if owned by current CLF and isn't defined
for _, service := range httpServices.Items {
if utils.HasSameOwner(service.OwnerReferences, currOwner) && (!httpInputs.Has(service.Name) || removeAllServices) {
if err := clusterRequest.RemoveInputService(service.Name); err != nil {
return err
}
}
}

return nil
}
6 changes: 6 additions & 0 deletions internal/k8shandler/reconciler.go
Expand Up @@ -7,6 +7,7 @@ import (
eslogstore "github.com/openshift/cluster-logging-operator/internal/logstore/elasticsearch"
"github.com/openshift/cluster-logging-operator/internal/logstore/lokistack"
logmetricexporter "github.com/openshift/cluster-logging-operator/internal/metrics/logfilemetricexporter"
"github.com/openshift/cluster-logging-operator/internal/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/openshift/cluster-logging-operator/internal/metrics/telemetry"
Expand Down Expand Up @@ -69,6 +70,11 @@ func Reconcile(cl *logging.ClusterLogging, forwarder *logging.ClusterLogForwarde
return fmt.Errorf("unable to create or update collection for %q: %v", clusterLoggingRequest.Cluster.Name, err)
}

// Clean up any stale http input services
if err = clusterLoggingRequest.RemoveInputServices([]metav1.OwnerReference{utils.AsOwner(forwarder)}, false); err != nil {
return fmt.Errorf("error removing stale http input services")
}

//if there is no early exit from reconciler then new CL spec is applied successfully hence healthStatus is set to true or 1
telemetry.Data.CLInfo.Set("healthStatus", constants.HealthyStatus)
telemetry.UpdateInfofromCLF(*clusterLoggingRequest.Forwarder)
Expand Down
13 changes: 13 additions & 0 deletions internal/k8shandler/service.go
@@ -1,9 +1,12 @@
package k8shandler

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
client "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/openshift/cluster-logging-operator/internal/constants"
"github.com/openshift/cluster-logging-operator/internal/factory"
Expand Down Expand Up @@ -40,3 +43,13 @@ func (clusterRequest *ClusterLoggingRequest) RemoveInputService(serviceName stri

return nil
}

// GetServiceList returns a list of services based on a key/value label
func (clusterRequest *ClusterLoggingRequest) GetServiceList(key, val, namespace string) (*core.ServiceList, error) {
labelSelector, _ := labels.Parse(fmt.Sprintf("%s=%s", key, val))
httpServices := core.ServiceList{}
if err := clusterRequest.Client.List(context.TODO(), &httpServices, &client.ListOptions{LabelSelector: labelSelector, Namespace: namespace}); err != nil {
return nil, fmt.Errorf("failure listing services with label: %s, %v", fmt.Sprintf("%s=%s", key, val), err)
}
return &httpServices, nil
}
2 changes: 2 additions & 0 deletions internal/network/service.go
Expand Up @@ -60,6 +60,8 @@ func ReconcileInputService(er record.EventRecorder, k8sClient client.Client, nam
constants.AnnotationServingCertSecretName: certSecretName,
}

desired.Labels[constants.LabelComponent] = constants.LabelHTTPInputService

utils.AddOwnerRefToObject(desired, owner)
return reconcile.Service(er, k8sClient, desired)
}