Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions api/observability/v1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package v1
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

const (

// ConditionTrue means the condition is met
ConditionTrue = metav1.ConditionTrue

Expand All @@ -30,8 +29,15 @@ const (
// ConditionTypeAuthorized identifies the state of authorization for the service
ConditionTypeAuthorized = GroupName + "/Authorized"

// ConditionTypeLogLevel validates the value of the log-level annotation
ConditionTypeLogLevel = GroupName + "/LogLevel"

// ConditionTypeMaxUnavailable validates the value of the max-unavailable-rollout annotation
ConditionTypeMaxUnavailable = GroupName + "/MaxUnavailableAnnotation"

// ConditionTypeUseKubeCache validates the value of the use-apiserver-cache annotation
ConditionTypeUseKubeCache = GroupName + "/UseKubeCacheAnnotation"

// ConditionTypeReady indicates the service is ready.
//
// Ready=True means the operands are running and providing some service.
Expand Down Expand Up @@ -77,9 +83,15 @@ const (
// ReasonMissingSpec applies when a type is specified without a defined spec (e.g. type application without obs.Application)
ReasonMissingSpec = "MissingSpec"

// ReasonLogLevelSupported indicates the support for the log level annotation value
// ReasonLogLevelSupported indicates the support for the log-level annotation value
ReasonLogLevelSupported = "LogLevelSupported"

// ReasonMaxUnavailableSupported indicates the support for the max-unavailable-rollout annotation value
ReasonMaxUnavailableSupported = "MaxUnavailableAnnotationSupported"

// ReasonKubeCacheSupported indicates the support for the use-apiserver-cache annotation value
ReasonKubeCacheSupported = "KubeCacheAnnotationSupported"

// ReasonReconciliationComplete when the operator has initialized, validated, and deployed the resources for the workload
ReasonReconciliationComplete = "ReconciliationComplete"

Expand Down
8 changes: 6 additions & 2 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Factory struct {
ResourceNames *factory.ForwarderResourceNames
isDaemonset bool
LogLevel string
UseKubeCache bool
MaxUnavailable string
}

// CollectorResourceRequirements returns the resource requirements for a given collector implementation
Expand All @@ -89,7 +91,7 @@ func (f *Factory) Affinity() *v1.Affinity {
return f.CollectorSpec.Affinity
}

func New(confHash, clusterID string, collectorSpec *obs.CollectorSpec, secrets internalobs.Secrets, configMaps map[string]*v1.ConfigMap, forwarderSpec obs.ClusterLogForwarderSpec, resNames *factory.ForwarderResourceNames, isDaemonset bool, logLevel string) *Factory {
func New(confHash, clusterID string, collectorSpec *obs.CollectorSpec, secrets internalobs.Secrets, configMaps map[string]*v1.ConfigMap, forwarderSpec obs.ClusterLogForwarderSpec, resNames *factory.ForwarderResourceNames, isDaemonset bool, logLevel string, useCache bool, maxUnavailable string) *Factory {
if collectorSpec == nil {
collectorSpec = &obs.CollectorSpec{}
}
Expand All @@ -109,13 +111,15 @@ func New(confHash, clusterID string, collectorSpec *obs.CollectorSpec, secrets i
PodLabelVisitor: vector.PodLogExcludeLabel,
isDaemonset: isDaemonset,
LogLevel: logLevel,
UseKubeCache: useCache,
MaxUnavailable: maxUnavailable,
}
return factory
}

func (f *Factory) NewDaemonSet(namespace, name string, trustedCABundle *v1.ConfigMap, tlsProfileSpec configv1.TLSProfileSpec) *apps.DaemonSet {
podSpec := f.NewPodSpec(trustedCABundle, f.ForwarderSpec, f.ClusterID, tlsProfileSpec, namespace)
ds := factory.NewDaemonSet(namespace, name, name, constants.CollectorName, constants.VectorName, *podSpec, f.CommonLabelInitializer, f.PodLabelVisitor)
ds := factory.NewDaemonSet(namespace, name, name, constants.CollectorName, constants.VectorName, f.MaxUnavailable, *podSpec, f.CommonLabelInitializer, f.PodLabelVisitor)
ds.Spec.Template.Annotations[constants.AnnotationSecretHash] = f.Secrets.Hash64a()
return ds
}
Expand Down
13 changes: 13 additions & 0 deletions internal/constants/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,17 @@ const (
AnnotationOtlpOutputTechPreview = "observability.openshift.io/tech-preview-otlp-output"

AnnotationSecretHash = "observability.openshift.io/secret-hash"

// AnnotationKubeCache is used to enable caching for requests to the kube-apiserver using vector kubernetes_logs source.
// Tech-Preview feature
//
// While enabling cache can significantly reduce Kubernetes control plane
// memory pressure, the trade-off is a chance of receiving stale data.
AnnotationKubeCache = "observability.openshift.io/use-apiserver-cache"

// AnnotationMaxUnavailable configures the maximum number of DaemonSet pods that can be unavailable during a rolling update.
// Tech-Preview feature
//
// This can be an absolute number (e.g., 1) or a percentage (e.g., 10%). Default is 100%.
AnnotationMaxUnavailable = "observability.openshift.io/max-unavailable-rollout"
)
63 changes: 55 additions & 8 deletions internal/controller/observability/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/openshift/cluster-logging-operator/internal/runtime/serviceaccount"
"github.com/openshift/cluster-logging-operator/internal/tls"
"github.com/openshift/cluster-logging-operator/internal/utils"
"github.com/openshift/cluster-logging-operator/internal/validations/observability"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -43,6 +44,11 @@ func ReconcileCollector(context internalcontext.ForwarderContext, pollInterval,
options = context.AdditionalContext
}

// Set kubeapi and rollout options based on annotation (LOG-7196)
// TODO: replace with API fields
SetKubeCacheOption(context.Forwarder.Annotations, options)
SetMaxUnavailableRolloutOption(context.Forwarder.Annotations, options)

if internalobs.Outputs(context.Forwarder.Spec.Outputs).NeedServiceAccountToken() {
// temporarily create SA token until collector is capable of dynamically reloading a projected serviceaccount token
var sa *corev1.ServiceAccount
Expand Down Expand Up @@ -100,28 +106,41 @@ func ReconcileCollector(context internalcontext.ForwarderContext, pollInterval,

isDaemonSet := !internalobs.DeployAsDeployment(*context.Forwarder)
log.V(3).Info("Deploying as DaemonSet", "isDaemonSet", isDaemonSet)
factory := collector.New(collectorConfHash, context.ClusterID, context.Forwarder.Spec.Collector, context.Secrets, context.ConfigMaps, context.Forwarder.Spec, resourceNames, isDaemonSet, LogLevel(context.Forwarder.Annotations))
if err = factory.ReconcileCollectorConfig(context.Client, context.Reader, context.Forwarder.Namespace, collectorConfig, ownerRef); err != nil {
collectorFactory := collector.New(
collectorConfHash,
context.ClusterID,
context.Forwarder.Spec.Collector,
context.Secrets, context.ConfigMaps,
context.Forwarder.Spec,
resourceNames,
isDaemonSet,
LogLevel(context.Forwarder.Annotations),
factory.IncludesKubeCacheOption(options),
factory.GetMaxUnavailableValue(options),
)

if err = collectorFactory.ReconcileCollectorConfig(context.Client, context.Reader, context.Forwarder.Namespace, collectorConfig, ownerRef); err != nil {
log.Error(err, "collector.ReconcileCollectorConfig")
return
}

reconcileWorkload := factory.ReconcileDaemonset
reconcileWorkload := collectorFactory.ReconcileDaemonset
if !isDaemonSet {
reconcileWorkload = factory.ReconcileDeployment
reconcileWorkload = collectorFactory.ReconcileDeployment
}

if err := reconcileWorkload(context.Client, context.Forwarder.Namespace, trustedCABundle, ownerRef); err != nil {
log.Error(err, "Error reconciling the deployment of the collector")
return err
}

if err := factory.ReconcileInputServices(context.Client, context.Reader, context.Forwarder.Namespace, ownerRef, factory.CommonLabelInitializer); err != nil {
if err := collectorFactory.ReconcileInputServices(context.Client, context.Reader, context.Forwarder.Namespace, ownerRef, collectorFactory.CommonLabelInitializer); err != nil {
log.Error(err, "collector.ReconcileInputServices")
return err
}

// Reconcile resources to support metrics gathering
if err := network.ReconcileService(context.Client, context.Forwarder.Namespace, resourceNames.CommonName, context.Forwarder.Name, constants.CollectorName, collector.MetricsPortName, resourceNames.SecretMetrics, collector.MetricsPort, ownerRef, factory.CommonLabelInitializer); err != nil {
if err := network.ReconcileService(context.Client, context.Forwarder.Namespace, resourceNames.CommonName, context.Forwarder.Name, constants.CollectorName, collector.MetricsPortName, resourceNames.SecretMetrics, collector.MetricsPort, ownerRef, collectorFactory.CommonLabelInitializer); err != nil {
log.Error(err, "collector.ReconcileService")
return err
}
Expand All @@ -134,12 +153,12 @@ func ReconcileCollector(context internalcontext.ForwarderContext, pollInterval,
return nil
}

func GenerateConfig(k8Client client.Client, spec obs.ClusterLogForwarder, resourceNames factory.ForwarderResourceNames, secrets internalobs.Secrets, op framework.Options) (config string, err error) {
func GenerateConfig(k8Client client.Client, clf obs.ClusterLogForwarder, resourceNames factory.ForwarderResourceNames, secrets internalobs.Secrets, op framework.Options) (config string, err error) {
tlsProfile, _ := tls.FetchAPIServerTlsProfile(k8Client)
op[framework.ClusterTLSProfileSpec] = tls.GetClusterTLSProfileSpec(tlsProfile)
//EvaluateAnnotationsForEnabledCapabilities(clusterRequest.Forwarder, op)
g := forwardergenerator.New()
generatedConfig, err := g.GenerateConf(secrets, spec.Spec, spec.Namespace, spec.Name, resourceNames, op)
generatedConfig, err := g.GenerateConf(secrets, clf.Spec, clf.Namespace, clf.Name, resourceNames, op)

if err != nil {
log.Error(err, "Unable to generate log configuration")
Expand All @@ -161,6 +180,16 @@ func EvaluateAnnotationsForEnabledCapabilities(annotations map[string]string, op
if strings.ToLower(value) == "true" {
options[generatorhelpers.EnableDebugOutput] = "true"
}
case constants.AnnotationKubeCache:
// Matching the validate_annotations logic
if observability.IsEnabledValue(value) {
options[framework.UseKubeCacheOption] = "true"
}
case constants.AnnotationMaxUnavailable:
// Matching the validate_annotations logic
if observability.IsPercentOrWholeNumber(value) {
options[framework.MaxUnavailableOption] = value
}
}
}
}
Expand All @@ -171,3 +200,21 @@ func LogLevel(annotations map[string]string) string {
}
return "warn"
}

func SetKubeCacheOption(annotations map[string]string, options framework.Options) {
if value, found := annotations[constants.AnnotationKubeCache]; found {
if observability.IsEnabledValue(value) {
log.V(3).Info("Kube cache annotation found")
options[framework.UseKubeCacheOption] = "true"
}
}
}

func SetMaxUnavailableRolloutOption(annotations map[string]string, options framework.Options) {
if value, found := annotations[constants.AnnotationMaxUnavailable]; found {
if observability.IsPercentOrWholeNumber(value) {
log.V(3).Info("Max Unavailable annotation found")
options[framework.MaxUnavailableOption] = value
}
}
}
9 changes: 9 additions & 0 deletions internal/controller/observability/collector_features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ var _ = Describe("#EvaluateAnnotationsForEnabledCapabilities", func() {
Entry("enables debug for true", helpers.EnableDebugOutput, "true", AnnotationDebugOutput, "true"),
Entry("enables debug for True", helpers.EnableDebugOutput, "true", AnnotationDebugOutput, "True"),
Entry("disables debug for anything else", "", "", AnnotationDebugOutput, "abcdef"),

Entry("enables kube-cache for true", framework.UseKubeCacheOption, "true", AnnotationKubeCache, "true"),
Entry("enables kube-cache for True", framework.UseKubeCacheOption, "true", AnnotationKubeCache, "True"),
Entry("enables kube-cache for enabled", framework.UseKubeCacheOption, "true", AnnotationKubeCache, "enabled"),
Entry("disables kube-cache for anything else", "", "", AnnotationKubeCache, "bubbles"),

Entry("enables max-unavailable for value '10'", framework.MaxUnavailableOption, "10", AnnotationMaxUnavailable, "10"),
Entry("enables max-unavailable for value '99%'", framework.MaxUnavailableOption, "99%", AnnotationMaxUnavailable, "99%"),
Entry("disables max-unavailable option for anything not a number or percentage", "", "", AnnotationMaxUnavailable, "fluffy"),
)

})
22 changes: 16 additions & 6 deletions internal/factory/daemonset.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package factory

import (
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/runtime"
"github.com/openshift/cluster-logging-operator/internal/utils"
"github.com/openshift/cluster-logging-operator/internal/validations/observability"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// NewDaemonSet stubs an instance of a daemonset
func NewDaemonSet(namespace, daemonsetName, instanceName, component, impl string, podSpec core.PodSpec, visitors ...func(o runtime.Object)) *apps.DaemonSet {
func NewDaemonSet(namespace, daemonsetName, instanceName, component, impl, maxUnavailable string, podSpec core.PodSpec, visitors ...func(o runtime.Object)) *apps.DaemonSet {
selectors := runtime.Selectors(instanceName, component, impl)
annotations := map[string]string{
"target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`,
}

intOrStringValue := intstr.Parse(maxUnavailable)
strategy := apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &intstr.IntOrString{
Type: intstr.String,
StrVal: "100%",
},
MaxUnavailable: &intOrStringValue,
},
}
ds := runtime.NewDaemonSet(namespace, daemonsetName, visitors...)
Expand All @@ -31,3 +31,13 @@ func NewDaemonSet(namespace, daemonsetName, instanceName, component, impl string
WithPodSpec(podSpec)
return ds
}

// GetMaxUnavailableValue checks the framework options for the flag maxUnavailableRollout
// Default is 100%
func GetMaxUnavailableValue(op framework.Options) string {
value, _ := utils.GetOption(op, framework.MaxUnavailableOption, "100%")
if !observability.IsPercentOrWholeNumber(value) {
value = "100%"
}
return value
}
58 changes: 48 additions & 10 deletions internal/factory/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package factory
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/runtime"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
Expand All @@ -11,20 +12,57 @@ import (
var _ = Describe("#NewDaemonSet", func() {

var (
daemonSet *apps.DaemonSet
expSelectors = runtime.Selectors("theinstancename", "thecomponent", "thecomponent")
daemonSet *apps.DaemonSet
expSelectors = runtime.Selectors("theinstancename", "thecomponent", "thecomponent")
op = framework.Options{}
maxUnavailable string
)

BeforeEach(func() {
daemonSet = NewDaemonSet("thenamespace", "thenname", "theinstancename", "thecomponent", "thecomponent", core.PodSpec{})
})
Context("with common properties and maxUnavailable empty", func() {
BeforeEach(func() {
daemonSet = NewDaemonSet(
"thenamespace",
"thenname",
"theinstancename",
"thecomponent",
"thecomponent",
maxUnavailable,
core.PodSpec{},
)
})

It("should leave the MinReadySeconds as the default", func() {
Expect(daemonSet.Spec.MinReadySeconds).ToNot(Equal(0), "Exp. the MinReadySeconds to be the default")
})
It("should leave the MinReadySeconds as the default", func() {
Expect(daemonSet.Spec.MinReadySeconds).ToNot(Equal(0), "Exp. the MinReadySeconds to be the default")
})

It("should only include the kubernetes common labels in the selector", func() {
Expect(daemonSet.Spec.Selector.MatchLabels).To(Equal(expSelectors), "Exp. the selector to contain kubernetes common labels")
It("should only include the kubernetes common labels in the selector", func() {
Expect(daemonSet.Spec.Selector.MatchLabels).To(Equal(expSelectors), "Exp. the selector to contain kubernetes common labels")
})
})

// This option should never be set as invalid since there is validation on the setter. This unit test
// ensures the ds method handles it properly regardless
DescribeTable("with maxUnavailable option", func(op framework.Options, exp string) {
maxUnavailable = GetMaxUnavailableValue(op)
daemonSet = NewDaemonSet(
"thenamespace",
"thenname",
"theinstancename",
"thecomponent",
"thecomponent",
maxUnavailable,
core.PodSpec{},
)
Expect(daemonSet.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable.String()).To(Equal(exp), "Exp. the maxUnavailable value to match")
},
Entry("missing", op, "100%"),
Entry("set to empty string", framework.Options{framework.MaxUnavailableOption: ""}, "100%"),
Entry("set to invalid value 'blue'", framework.Options{framework.MaxUnavailableOption: "blue"}, "100%"),
Entry("set to invalid zero", framework.Options{framework.MaxUnavailableOption: "0"}, "100%"),
Entry("set to invalid decimal", framework.Options{framework.MaxUnavailableOption: "2.5"}, "100%"),
Entry("set to invalid percentage", framework.Options{framework.MaxUnavailableOption: "200%"}, "100%"),
Entry("set to whole number", framework.Options{framework.MaxUnavailableOption: "50"}, "50"),
Entry("set to percentage", framework.Options{framework.MaxUnavailableOption: "99%"}, "99%"),
Entry("set to '100%'", framework.Options{framework.MaxUnavailableOption: "100%"}, "100%"),
)
})
13 changes: 13 additions & 0 deletions internal/factory/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package factory

import (
log "github.com/ViaQ/logerr/v2/log/static"
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/utils"
)

func IncludesKubeCacheOption(op framework.Options) bool {
_, found := utils.GetOption(op, framework.UseKubeCacheOption, "")
log.V(3).Info("Kube caching option enabled")
return found
}
10 changes: 5 additions & 5 deletions internal/generator/framework/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
)

const (
ClusterTLSProfileSpec = "tlsProfileSpec"

MinTLSVersion = "minTLSVersion"
Ciphers = "ciphers"

ClusterTLSProfileSpec = "tlsProfileSpec"
MinTLSVersion = "minTLSVersion"
Ciphers = "ciphers"
URL = "url"
OptionServiceAccountTokenSecretName = "serviceAccountTokenSecretName"
OptionForwarderName = "forwarderName"
UseKubeCacheOption = "useKubeCache"
MaxUnavailableOption = "maxUnavailableRollout"
)

// Options is a map of Options used to customize the config generation. E.g. Debugging, legacy config generation
Expand Down
Loading