Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix left over issues with managed fluentbit #16734

Merged
merged 3 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
OverrideConfigMap types.NamespacedName
PipelineDefaults configbuilder.PipelineDefaults
Overrides overrides.Config
DaemonSetConfig resources.DaemonSetConfig
}

//go:generate mockery --name DaemonSetProber --filename daemon_set_prober.go
Expand Down Expand Up @@ -146,48 +147,57 @@ func (r *Reconciler) doReconcile(ctx context.Context, pipeline *telemetryv1alpha
}

func (r *Reconciler) reconcileFluentBit(ctx context.Context, name types.NamespacedName, pipeline *telemetryv1alpha1.LogPipeline, checksum string) error {
shouldDeleteFluentBit, err := r.isLastPipelineMarkedForDeletion(ctx, pipeline)
if err != nil {
return fmt.Errorf("failed to check if LogPipeline is last marked for deletion: %v", err)
}

if shouldDeleteFluentBit {
return utils.DeleteFluentBit(ctx, r, name)
}

serviceAccount := resources.MakeServiceAccount(name)
if err := utils.CreateOrUpdateServiceAccount(ctx, r, serviceAccount); err != nil {
return fmt.Errorf("failed to create fluent bit service account: %w", err)
}
clusterRole := resources.MakeClusterRole(name)
if err := utils.CreateOrUpdateClusterRole(ctx, r, clusterRole); err != nil {
return fmt.Errorf("failed to create fluent bit cluster role: %w", err)
}
clusterRoleBinding := resources.MakeClusterRoleBinding(name)
if err := utils.CreateOrUpdateClusterRoleBinding(ctx, r, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to create fluent bit cluster role Binding: %w", err)
}
daemonSet := resources.MakeDaemonSet(name, checksum, r.config.DaemonSetConfig)
if err := utils.CreateOrUpdateDaemonSet(ctx, r, daemonSet); err != nil {
return fmt.Errorf("failed to reconcile fluent bit daemonset: %w", err)
}
service := resources.MakeService(name)
if err := utils.CreateOrUpdateService(ctx, r, service); err != nil {
return fmt.Errorf("failed to reconcile fluent bit service: %w", err)
}
cm := resources.MakeConfigMap(name)
if err := utils.CreateOrUpdateConfigMap(ctx, r, cm); err != nil {
return fmt.Errorf("failed to reconcile fluent bit configmap: %w", err)
}
luaCm := resources.MakeLuaConfigMap(name)
if err := utils.CreateOrUpdateConfigMap(ctx, r, luaCm); err != nil {
return fmt.Errorf("failed to reconcile fluent bit lua configmap: %w", err)
}
return nil
}

func (r *Reconciler) isLastPipelineMarkedForDeletion(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) (bool, error) {
if isNotMarkedForDeletion(pipeline) {
serviceAccount := resources.MakeServiceAccount(name)
if err := utils.CreateOrUpdateServiceAccount(ctx, r, serviceAccount); err != nil {
return fmt.Errorf("failed to create fluent bit service account: %w", err)
}
clusterRole := resources.MakeClusterRole(name)
if err := utils.CreateOrUpdateClusterRole(ctx, r, clusterRole); err != nil {
return fmt.Errorf("failed to create fluent bit cluster role: %w", err)
}
clusterRoleBinding := resources.MakeClusterRoleBinding(name)
if err := utils.CreateOrUpdateClusterRoleBinding(ctx, r, clusterRoleBinding); err != nil {
return fmt.Errorf("failed to create fluent bit cluster role Binding: %w", err)
}
daemonSet := resources.MakeDaemonSet(name, checksum)
if err := utils.CreateOrUpdateDaemonSet(ctx, r, daemonSet); err != nil {
return fmt.Errorf("failed to reconcile fluent bit daemonset: %w", err)
}
service := resources.MakeService(name)
if err := utils.CreateOrUpdateService(ctx, r, service); err != nil {
return fmt.Errorf("failed to reconcile fluent bit service: %w", err)
}
cm := resources.MakeConfigMap(name)
if err := utils.CreateOrUpdateConfigMap(ctx, r, cm); err != nil {
return fmt.Errorf("failed to reconcile fluent bit configmap: %w", err)
}
luaCm := resources.MakeLuaConfigMap(name)
if err := utils.CreateOrUpdateConfigMap(ctx, r, luaCm); err != nil {
return fmt.Errorf("failed to reconcile fluent bit lua configmap: %w", err)
}
return nil
return false, nil
}

var allPipelines telemetryv1alpha1.LogPipelineList
if err := r.List(ctx, &allPipelines); err != nil {
return fmt.Errorf("failed to determine condition for deleting fluent bit: %w", err)
}

if len(allPipelines.Items) == 1 && allPipelines.Items[0].Name == pipeline.Name {
return utils.DeleteFluentBit(ctx, r, name)
return false, fmt.Errorf("failed to list LogPipelines: %v", err)
}

return nil
return len(allPipelines.Items) == 1 && allPipelines.Items[0].Name == pipeline.Name, nil
}

func (r *Reconciler) updateMetrics(ctx context.Context) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"github.com/kyma-project/kyma/components/telemetry-operator/internal/logger"
"github.com/kyma-project/kyma/components/telemetry-operator/internal/overrides"
"github.com/kyma-project/kyma/components/telemetry-operator/internal/resources/logpipeline"
"k8s.io/apimachinery/pkg/api/resource"
"path/filepath"
"testing"

Expand Down Expand Up @@ -59,6 +61,16 @@ var (
FilesConfigMap: types.NamespacedName{Name: "test-telemetry-fluent-bit-files", Namespace: "default"},
EnvSecret: types.NamespacedName{Name: "test-telemetry-fluent-bit-env", Namespace: "default"},
OverrideConfigMap: types.NamespacedName{Name: "override-config", Namespace: "default"},
DaemonSetConfig: logpipeline.DaemonSetConfig{
FluentBitImage: "my-fluent-bit-image",
FluentBitConfigPrepperImage: "my-fluent-bit-config-image",
ExporterImage: "my-exporter-image",
PriorityClassName: "my-priority-class",
CPULimit: resource.MustParse("1"),
MemoryLimit: resource.MustParse("500Mi"),
CPURequest: resource.MustParse(".1"),
MemoryRequest: resource.MustParse("100Mi"),
},
PipelineDefaults: builder.PipelineDefaults{
InputTag: "kube",
MemoryBufferLimit: "10M",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ import (

const checksumAnnotationKey = "checksum/logpipeline-config"

type DaemonSetConfig struct {
FluentBitImage string
FluentBitConfigPrepperImage string
ExporterImage string
PriorityClassName string
CPULimit resource.Quantity
MemoryLimit resource.Quantity
CPURequest resource.Quantity
MemoryRequest resource.Quantity
}

func MakeServiceAccount(name types.NamespacedName) *corev1.ServiceAccount {
serviceAccount := corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -52,15 +63,15 @@ func MakeClusterRole(name types.NamespacedName) *v1.ClusterRole {
return &clusterRole
}

func MakeDaemonSet(name types.NamespacedName, checksum string) *appsv1.DaemonSet {
func MakeDaemonSet(name types.NamespacedName, checksum string, dsConfig DaemonSetConfig) *appsv1.DaemonSet {
resourcesFluentBit := corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceCPU: dsConfig.CPURequest,
corev1.ResourceMemory: dsConfig.MemoryRequest,
},
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("400m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
corev1.ResourceCPU: dsConfig.CPULimit,
corev1.ResourceMemory: dsConfig.MemoryLimit,
},
}

Expand Down Expand Up @@ -94,15 +105,15 @@ func MakeDaemonSet(name types.NamespacedName, checksum string) *appsv1.DaemonSet
},
Spec: corev1.PodSpec{
ServiceAccountName: name.Name,
PriorityClassName: "kyma-system-priority",
PriorityClassName: dsConfig.PriorityClassName,
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: pointer.Bool(false),
SeccompProfile: &corev1.SeccompProfile{Type: "RuntimeDefault"},
},
InitContainers: []corev1.Container{
{
Name: "prep-fluent-bit-config",
Image: "eu.gcr.io/kyma-project/external/busybox:1.34.1",
Image: dsConfig.FluentBitConfigPrepperImage,
Command: []string{
"sh", "-c",
"cp /main/* /fluent-bit/etc/ && mkdir -p /fluent-bit/etc/dynamic/ && cp /dynamic/* /fluent-bit/etc/dynamic && mkdir -p /fluent-bit/etc/dynamic-parsers/ && cp /dynamic-parsers/* /fluent-bit/etc/dynamic-parsers || touch /fluent-bit/etc/dynamic/empty.conf",
Expand Down Expand Up @@ -135,7 +146,7 @@ func MakeDaemonSet(name types.NamespacedName, checksum string) *appsv1.DaemonSet
Privileged: pointer.Bool(false),
ReadOnlyRootFilesystem: pointer.Bool(true),
},
Image: "eu.gcr.io/kyma-project/tpi/fluent-bit:2.0.8-723b551a",
Image: dsConfig.FluentBitImage,
ImagePullPolicy: "IfNotPresent",
EnvFrom: []corev1.EnvFromSource{
{
Expand All @@ -151,11 +162,6 @@ func MakeDaemonSet(name types.NamespacedName, checksum string) *appsv1.DaemonSet
ContainerPort: 2020,
Protocol: "TCP",
},
{
Name: "http-metrics",
ContainerPort: 2021,
Protocol: "TCP",
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand Down Expand Up @@ -188,7 +194,7 @@ func MakeDaemonSet(name types.NamespacedName, checksum string) *appsv1.DaemonSet
},
{
Name: "exporter",
Image: "eu.gcr.io/kyma-project/directory-size-exporter:v20221020-e314a071",
Image: dsConfig.ExporterImage,
Resources: resourcesExporter,
Args: []string{
"--storage-path=/data/flb-storage/",
Expand Down Expand Up @@ -413,7 +419,7 @@ function map_keys(table)
local new_table = {}
local changed_keys = {}
for key, val in pairs(table) do
local mapped_key = string.gsub(key, \"[%/%.]\", \"_\")
local mapped_key = string.gsub(key, "[%/%.]", "_")
if mapped_key ~= key then
new_table[mapped_key] = val
changed_keys[key] = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logpipeline
import (
"github.com/stretchr/testify/require"
v1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"

"testing"
Expand All @@ -11,7 +12,17 @@ import (
func TestMakeDaemonSet(t *testing.T) {
name := types.NamespacedName{Name: "telemetry-fluent-bit", Namespace: "kyma-system"}
checksum := "foo"
daemonSet := MakeDaemonSet(name, checksum)
ds := DaemonSetConfig{
FluentBitImage: "foo-fluenbit",
FluentBitConfigPrepperImage: "foo-configprepper",
ExporterImage: "foo-exporter",
PriorityClassName: "foo-prio-class",
CPULimit: resource.MustParse(".25"),
MemoryLimit: resource.MustParse("400Mi"),
CPURequest: resource.MustParse(".1"),
MemoryRequest: resource.MustParse("100Mi"),
}
daemonSet := MakeDaemonSet(name, checksum, ds)

require.NotNil(t, daemonSet)
require.Equal(t, daemonSet.Name, name.Name)
Expand Down
61 changes: 46 additions & 15 deletions components/telemetry-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"errors"
"flag"
"github.com/kyma-project/kyma/components/telemetry-operator/internal/resources/logpipeline"
"os"
"strings"
"time"
Expand Down Expand Up @@ -93,24 +94,35 @@ var (
traceCollectorCPURequest string
traceCollectorMemoryRequest string

fluentBitEnvSecret string
fluentBitFilesConfigMap string
fluentBitPath string
fluentBitPluginDirectory string
fluentBitInputTag string
fluentBitMemoryBufferLimit string
fluentBitStorageType string
fluentBitFsBufferLimit string
fluentBitConfigMap string
fluentBitSectionsConfigMap string
fluentBitParsersConfigMap string
fluentBitDaemonSet string
maxLogPipelines int
fluentBitEnvSecret string
fluentBitFilesConfigMap string
fluentBitPath string
fluentBitPluginDirectory string
fluentBitInputTag string
fluentBitMemoryBufferLimit string
fluentBitStorageType string
fluentBitFsBufferLimit string
fluentBitConfigMap string
fluentBitSectionsConfigMap string
fluentBitParsersConfigMap string
fluentBitDaemonSet string
fluentBitCPULimit string
fluentBitMemoryLimit string
fluentBitCPURequest string
fluentBitMemoryRequest string
maxLogPipelines int
fluentBitImageVersion string
fluentBitExporterVersion string
fluentBitConfigPrepperImageVersion string
fluentBitPriorityClassName string
)

const (
otelImage = "eu.gcr.io/kyma-project/tpi/otel-collector:0.70.0-723b551a"
overrideConfigMapName = "telemetry-override-config"
otelImage = "eu.gcr.io/kyma-project/tpi/otel-collector:0.70.0-723b551a"
overrideConfigMapName = "telemetry-override-config"
fluentBitImage = "eu.gcr.io/kyma-project/tpi/fluent-bit:2.0.8-723b551a"
fluentBitConfigPrepperImage = "eu.gcr.io/kyma-project/external/busybox:1.34.1"
fluentBitExporterImage = "eu.gcr.io/kyma-project/directory-size-exporter:v20221020-e314a071"
)

//nolint:gochecknoinits
Expand Down Expand Up @@ -200,6 +212,15 @@ func main() {
flag.StringVar(&fluentBitStorageType, "fluent-bit-storage-type", "filesystem", "Fluent Bit buffering mechanism (filesystem or memory)")
flag.StringVar(&fluentBitFsBufferLimit, "fluent-bit-filesystem-buffer-limit", "1G", "Fluent Bit filesystem buffer limit per log pipeline")
flag.StringVar(&deniedFilterPlugins, "fluent-bit-denied-filter-plugins", "", "Comma separated list of denied filter plugins even if allowUnsupportedPlugins is enabled. If empty, all filter plugins are allowed.")
flag.StringVar(&fluentBitCPULimit, "fluent-bit-cpu-limit", "1", "CPU limit for tracing fluent-bit")
flag.StringVar(&fluentBitMemoryLimit, "fluent-bit-memory-limit", "1Gi", "Memory limit for fluent-bit")
flag.StringVar(&fluentBitCPURequest, "fluent-bit-cpu-request", "400m", "CPU request for fluent-bit")
flag.StringVar(&fluentBitMemoryRequest, "fluent-bit-memory-request", "256Mi", "Memory request for fluent-bit")
flag.StringVar(&fluentBitImageVersion, "fluent-bit-image", fluentBitImage, "Image for fluent-bit")
flag.StringVar(&fluentBitConfigPrepperImageVersion, "fluent-bit-config-prepper-image", fluentBitConfigPrepperImage, "Image for fluent-bit config preparation")
flag.StringVar(&fluentBitExporterVersion, "fluent-bit-exporter-image", fluentBitExporterImage, "Image for exporting fluent bit filesystem usage")
flag.StringVar(&fluentBitPriorityClassName, "fluent-bit-priority-class-name", "kyma-system-priority", "Name of the priority class of fluent bit ")

flag.StringVar(&deniedOutputPlugins, "fluent-bit-denied-output-plugins", "", "Comma separated list of denied output plugins even if allowUnsupportedPlugins is enabled. If empty, all output plugins are allowed.")
flag.IntVar(&maxLogPipelines, "fluent-bit-max-pipelines", 5, "Maximum number of LogPipelines to be created. If 0, no limit is applied.")

Expand Down Expand Up @@ -339,6 +360,16 @@ func createLogPipelineReconciler(client client.Client) *logpipelinecontroller.Re
DaemonSet: types.NamespacedName{Namespace: telemetryNamespace, Name: fluentBitDaemonSet},
OverrideConfigMap: types.NamespacedName{Name: overrideConfigMapName, Namespace: telemetryNamespace},
PipelineDefaults: createPipelineDefaults(),
DaemonSetConfig: logpipeline.DaemonSetConfig{
FluentBitImage: fluentBitImageVersion,
FluentBitConfigPrepperImage: fluentBitConfigPrepperImageVersion,
ExporterImage: fluentBitExporterVersion,
PriorityClassName: fluentBitPriorityClassName,
CPULimit: resource.MustParse(fluentBitCPULimit),
MemoryLimit: resource.MustParse(fluentBitMemoryLimit),
CPURequest: resource.MustParse(fluentBitCPURequest),
MemoryRequest: resource.MustParse(fluentBitMemoryRequest),
},
}
overrides := overrides.New(configureLogLevelOnFly, &kubernetes.ConfigmapProber{Client: client})

Expand Down
10 changes: 10 additions & 0 deletions resources/telemetry/charts/operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,19 @@ spec:
- --fluent-bit-env-secret={{ .Release.Name }}-{{ .Values.logProcessor }}-env
- --fluent-bit-files-cm={{ .Release.Name }}-{{ .Values.logProcessor }}-files
- --fluent-bit-filesystem-buffer-limit={{ .Values.filesystemBufferLimit }}
- --fluent-bit-cpu-limit={{ .Values.fluentbit.resources.limits.cpu }}
- --fluent-bit-memory-limit={{ .Values.fluentbit.resources.limits.memory }}
- --fluent-bit-cpu-request={{ .Values.fluentbit.resources.requests.cpu }}
- --fluent-bit-memory-request={{ .Values.fluentbit.resources.requests.memory }}
- --fluent-bit-denied-filter-plugins={{ join "," .Values.deniedPlugins.filter}}
- --fluent-bit-denied-output-plugins={{ join "," .Values.deniedPlugins.output}}
- --fluent-bit-max-pipelines={{.Values.maxLogPipelines}}
- --fluent-bit-image={{ include "imageurl" (dict "reg" .Values.global.containerRegistry "img" .Values.global.images.fluent_bit) }}
- --fluent-bit-config-prepper-image={{ include "imageurl" (dict "reg" $.Values.global.containerRegistry "img" $.Values.global.images.busybox) }}
- --fluent-bit-exporter-image={{ include "imageurl" (dict "reg" .Values.global.containerRegistry "img" .Values.global.images.directory_size_exporter) }}
{{- if or .Values.priorityClassName .Values.global.highPriorityClassName }}
- --fluent-bit-priority-class-name={{ coalesce .Values.priorityClassName .Values.global.highPriorityClassName }}
{{- end }}
{{- if not .Values.controllers.logging.enabled }}
- --enable-logging=false
{{- end }}
Expand Down
7 changes: 7 additions & 0 deletions resources/telemetry/charts/operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ readinessProbe:
periodSeconds: 10

fluentbit:
resources:
limits:
cpu: 400m
memory: 256Mi
requests:
cpu: 10m
memory: 50Mi
fullnameOverride: telemetry-fluent-bit
networkPolicy:
enabled: false
Expand Down
14 changes: 7 additions & 7 deletions resources/telemetry/profile-production.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
operator:
maxLogPipelines: 5
fluent-bit:
resources:
requests:
cpu: 100m
limits:
cpu: "1"
memory: 1Gi
fluent-bit:
rakesh-garimella marked this conversation as resolved.
Show resolved Hide resolved
resources:
requests:
cpu: 100m
limits:
cpu: "1"
memory: 1Gi