Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable managed fluent-bit #16668

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
145 changes: 125 additions & 20 deletions components/telemetry-operator/config/rbac/role.yaml
Expand Up @@ -10,36 +10,48 @@ rules:
resources:
- configmaps
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- admissionregistration.k8s.io
Expand All @@ -53,24 +65,16 @@ rules:
resources:
- daemonsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
Expand All @@ -92,6 +96,30 @@ rules:
- patch
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- clusterrolebindings
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- clusterroles
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- telemetry.kyma-project.io
resources:
Expand Down Expand Up @@ -164,3 +192,80 @@ rules:
- get
- patch
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
creationTimestamp: null
name: manager-role
namespace: kyma-system
rules:
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- delete
- patch
- update
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- services
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- daemonsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
Expand Up @@ -34,16 +34,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const checksumAnnotationKey = "checksum/logpipeline-config"

type Config struct {
DaemonSet types.NamespacedName
SectionsConfigMap types.NamespacedName
FilesConfigMap types.NamespacedName
EnvSecret types.NamespacedName
OverrideConfigMap types.NamespacedName
PipelineDefaults configbuilder.PipelineDefaults
ManageFluentBit bool
Overrides overrides.Config
}

Expand All @@ -61,19 +58,17 @@ type Reconciler struct {
client.Client
config Config
prober DaemonSetProber
annotator DaemonSetAnnotator
allLogPipelines prometheus.Gauge
unsupportedLogPipelines prometheus.Gauge
syncer syncer
globalConfig overrides.GlobalConfigHandler
}

func NewReconciler(client client.Client, config Config, prober DaemonSetProber, annotator DaemonSetAnnotator, handler *overrides.Handler) *Reconciler {
func NewReconciler(client client.Client, config Config, prober DaemonSetProber, handler *overrides.Handler) *Reconciler {
var r Reconciler
r.Client = client
r.config = config
r.prober = prober
r.annotator = annotator
r.allLogPipelines = prometheus.NewGauge(prometheus.GaugeOpts{Name: "telemetry_all_logpipelines", Help: "Number of log pipelines."})
r.unsupportedLogPipelines = prometheus.NewGauge(prometheus.GaugeOpts{Name: "telemetry_unsupported_logpipelines", Help: "Number of log pipelines with custom filters or outputs."})
metrics.Registry.MustRegister(r.allLogPipelines, r.unsupportedLogPipelines)
Expand Down Expand Up @@ -129,37 +124,43 @@ func (r *Reconciler) doReconcile(ctx context.Context, pipeline *telemetryv1alpha
return err
}

if r.config.ManageFluentBit {
name := r.config.DaemonSet
if err = r.reconcileFluentBit(ctx, name, pipeline); err != nil {
return err
}
}

if err = r.syncer.syncFluentBitConfig(ctx, pipeline); err != nil {
return err
}

if err = cleanupFinalizersIfNeeded(ctx, r.Client, pipeline); err != nil {
var checksum string
if checksum, err = r.calculateChecksum(ctx); err != nil {
return err
}

var checksum string
if checksum, err = r.calculateChecksum(ctx); err != nil {
name := r.config.DaemonSet
if err = r.reconcileFluentBit(ctx, name, pipeline, checksum); err != nil {
return err
}

if err = r.annotator.SetAnnotation(ctx, r.config.DaemonSet, checksumAnnotationKey, checksum); err != nil {
rakesh-garimella marked this conversation as resolved.
Show resolved Hide resolved
if err = cleanupFinalizersIfNeeded(ctx, r.Client, pipeline); err != nil {
return err
}

return err
}

func (r *Reconciler) reconcileFluentBit(ctx context.Context, name types.NamespacedName, pipeline *telemetryv1alpha1.LogPipeline) error {
func (r *Reconciler) reconcileFluentBit(ctx context.Context, name types.NamespacedName, pipeline *telemetryv1alpha1.LogPipeline, checksum string) error {
if isNotMarkedForDeletion(pipeline) {
ds := resources.MakeDaemonSet(name)
if err := utils.CreateOrUpdateDaemonSet(ctx, r, ds); err != nil {
serviceAccount := resources.MakeServiceAccount(name)
if err := utils.CreateOrUpdateServiceAccount(ctx, r, serviceAccount); err != nil {
return fmt.Errorf("failed to create fluent bit service account: %w", err)
}
clusterRole := resources.MakeClusterRole(name)
if err := utils.CreateOrUpdateClusterRole(ctx, r, clusterRole); err != nil {
return fmt.Errorf("failed to create fluent bit cluster role: %w", err)
}
clusterRoleBinding := resources.MakeClusterRoleBinding(name)
if err := utils.CreateOrUpdateClusterRoleBinding(ctx, r, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to create fluent bit cluster role Binding: %w", err)
}
daemonSet := resources.MakeDaemonSet(name, checksum)
if err := utils.CreateOrUpdateDaemonSet(ctx, r, daemonSet); err != nil {
return fmt.Errorf("failed to reconcile fluent bit daemonset: %w", err)
}
service := resources.MakeService(name)
Expand Down
Expand Up @@ -108,59 +108,6 @@ var _ = Describe("LogPipeline controller", func() {
}
Expect(k8sClient.Create(ctx, secret)).Should(Succeed())

podLabels := map[string]string{
"app.kubernetes.io/instance": "logging",
"app.kubernetes.io/name": "fluent-bit",
}
container := corev1.Container{
Name: "fluent-bit",
Image: "fluent-bit",
}
fluentBitDs := &appsv1.DaemonSet{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "DaemonSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: testConfig.DaemonSet.Name,
Namespace: testConfig.DaemonSet.Namespace,
},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: podLabels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
container,
},
},
},
},
}
Expect(k8sClient.Create(ctx, fluentBitDs)).Should(Succeed())

fluentBitPod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: testConfig.DaemonSet.Name + "-123",
Namespace: testConfig.DaemonSet.Namespace,
Labels: podLabels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
container,
},
},
}
Expect(k8sClient.Create(ctx, fluentBitPod)).Should(Succeed())

file := telemetryv1alpha1.FileMount{
Name: "myFile",
Content: "file-content",
Expand Down Expand Up @@ -305,7 +252,7 @@ var _ = Describe("LogPipeline controller", func() {

Expect(k8sClient.Delete(ctx, logPipeline)).Should(Succeed())

// Fluent Bit daemon set should rollout-restarted (generation changes from 1 to 2)
// Fluent Bit daemon set should be created and generation should be 1
Eventually(func() int {
var fluentBitDaemonSet appsv1.DaemonSet
err := k8sClient.Get(ctx, types.NamespacedName{
Expand All @@ -316,7 +263,7 @@ var _ = Describe("LogPipeline controller", func() {
return 0
}
return int(fluentBitDaemonSet.Generation)
}, timeout, interval).Should(Equal(2))
}, timeout, interval).Should(Equal(1))

// Fluent Bit daemon set should have checksum annotation set
Eventually(func() bool {
Expand Down
Expand Up @@ -113,7 +113,7 @@ var _ = BeforeSuite(func() {
client := mgr.GetClient()
overrides := overrides.New(configureLogLevelOnFly, &kubernetes.ConfigmapProber{Client: client})

reconciler := NewReconciler(client, testConfig, &kubernetes.DaemonSetProber{Client: client}, &kubernetes.DaemonSetAnnotator{Client: client}, overrides)
reconciler := NewReconciler(client, testConfig, &kubernetes.DaemonSetProber{Client: client}, overrides)
err = reconciler.SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())

Expand Down