Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,45 @@ Apply the provided example resource for telemetry-controller: [telemetry-control
kubectl apply -f telemetry-controller.yaml
```

## Testing and Debugging

### Dry Run Mode

The Telemetry Controller supports a `dryRunMode` flag in the Collector CRD that enables a simplified pipeline configuration for testing and debugging purposes.

To enable dry run mode, set both `debug` and `dryRunMode` in your Collector resource:

```yaml
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Collector
metadata:
name: example-collector
spec:
debug: true
dryRunMode: true
tenantSelector:
matchLabels:
example: "true"
```

When `dryRunMode` is enabled, the generated OpenTelemetry Collector pipeline is simplified:

- Only data-modifying components are included (e.g., transform processors, k8sattributes processor)
- All exporters except the debug exporter are disabled
- Persistence options are disabled

This feature is particularly useful for:

- **Testing new processor configurations in isolation** - Verify that your data transformations work correctly without sending data to production backends
- **Validating data transformations before production deployment** - See exactly how your telemetry data is being modified
- **Inspecting telemetry pipelines** - Examine the data flow without affecting production exporters or generating unnecessary traffic

To view the debug output, check the collector logs:

```sh
kubectl logs -n telemetry-controller-system -l app.kubernetes.io/name=example-collector -f
```

## Under the hood

Telemetry Controller uses a [custom OpenTelemetry Collector distribution](https://github.com/axoflow/axoflow-otel-collector-releases) as its agent. This distribution is and will be compatible with the upstream OpenTelemetry Collector distribution regarding core features, but:
Expand Down
8 changes: 7 additions & 1 deletion api/telemetry/v1alpha1/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type MemoryLimiter struct {
MemorySpikePercentage uint32 `json:"spike_limit_percentage"`
}

// +kubebuilder:validation:XValidation:rule="!has(self.dryRunMode) || !self.dryRunMode || (has(self.debug) && self.debug)",message="dryRunMode can only be set to true when debug is explicitly set to true"

// CollectorSpec defines the desired state of Collector
type CollectorSpec struct {
// +kubebuilder:validation:Required
Expand All @@ -64,7 +66,11 @@ type CollectorSpec struct {
ControlNamespace string `json:"controlNamespace"`

// Enables debug logging for the collector.
Debug bool `json:"debug,omitempty"`
Debug *bool `json:"debug,omitempty"`

// DryRunMode disables all exporters except for the debug exporter, as well as persistence options configured for the collector.
// This can be useful for testing and debugging purposes.
DryRunMode *bool `json:"dryRunMode,omitempty"`

// Setting memory limits for the Collector using the memory limiter processor.
MemoryLimiter *MemoryLimiter `json:"memoryLimiter,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions api/telemetry/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ spec:
debug:
description: Enables debug logging for the collector.
type: boolean
dryRunMode:
description: |-
DryRunMode disables all exporters except for the debug exporter, as well as persistence options configured for the collector.
This can be useful for testing and debugging purposes.
type: boolean
memoryLimiter:
description: Setting memory limits for the Collector using the memory
limiter processor.
Expand Down Expand Up @@ -7501,6 +7506,11 @@ spec:
- controlNamespace
- tenantSelector
type: object
x-kubernetes-validations:
- message: dryRunMode can only be set to true when debug is explicitly
set to true
rule: '!has(self.dryRunMode) || !self.dryRunMode || (has(self.debug)
&& self.debug)'
status:
description: CollectorStatus defines the observed state of Collector
properties:
Expand Down
10 changes: 10 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ spec:
debug:
description: Enables debug logging for the collector.
type: boolean
dryRunMode:
description: |-
DryRunMode disables all exporters except for the debug exporter, as well as persistence options configured for the collector.
This can be useful for testing and debugging purposes.
type: boolean
memoryLimiter:
description: Setting memory limits for the Collector using the memory
limiter processor.
Expand Down Expand Up @@ -7501,6 +7506,11 @@ spec:
- controlNamespace
- tenantSelector
type: object
x-kubernetes-validations:
- message: dryRunMode can only be set to true when debug is explicitly
set to true
rule: '!has(self.dryRunMode) || !self.dryRunMode || (has(self.debug)
&& self.debug)'
status:
description: CollectorStatus defines the observed state of Collector
properties:
Expand Down
7 changes: 5 additions & 2 deletions pkg/resources/manager/collector_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (c *CollectorManager) BuildConfigInputForCollector(ctx context.Context, col
TenantSubscriptionMap: tenantSubscriptionMap,
SubscriptionOutputMap: subscriptionOutputMap,
},
Debug: collector.Spec.Debug,
Debug: utils.DerefOrZero(collector.Spec.Debug),
DryRunMode: utils.DerefOrZero(collector.Spec.DryRunMode),
MemoryLimiter: *collector.Spec.MemoryLimiter,
}, nil
}
Expand Down Expand Up @@ -181,7 +182,9 @@ func (c *CollectorManager) OtelCollector(collector *v1alpha1.Collector, otelConf
OpenTelemetryCommonFields: *collector.Spec.OtelCommonFields,
},
}
handleVolumes(&otelCollector.Spec.OpenTelemetryCommonFields, tenants, outputs)
if !utils.DerefOrZero(collector.Spec.DryRunMode) {
handleVolumes(&otelCollector.Spec.OpenTelemetryCommonFields, tenants, outputs)
}
setOtelCommonFieldsDefaults(&otelCollector.Spec.OpenTelemetryCommonFields, additionalArgs, saName)

if memoryLimit := collector.Spec.GetMemoryLimit(); memoryLimit != nil {
Expand Down
72 changes: 43 additions & 29 deletions pkg/resources/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type OtelColConfigInput struct {
components.ResourceRelations
MemoryLimiter v1alpha1.MemoryLimiter
Debug bool
DryRunMode bool
}

func (cfgInput *OtelColConfigInput) IsEmpty() bool {
Expand All @@ -61,15 +62,22 @@ func (cfgInput *OtelColConfigInput) IsEmpty() bool {
}

func (cfgInput *OtelColConfigInput) generateExporters(ctx context.Context) map[string]any {
exporters := map[string]any{}
// If in dry-run mode, only generate debug exporters
if cfgInput.DryRunMode {
return exporter.GenerateDebugExporters()
}

exporters := make(map[string]any)

if cfgInput.Debug {
maps.Copy(exporters, exporter.GenerateDebugExporters())
}

maps.Copy(exporters, exporter.GenerateMetricsExporters())
maps.Copy(exporters, exporter.GenerateOTLPGRPCExporters(ctx, cfgInput.ResourceRelations))
maps.Copy(exporters, exporter.GenerateOTLPHTTPExporters(ctx, cfgInput.ResourceRelations))
maps.Copy(exporters, exporter.GenerateFluentforwardExporters(ctx, cfgInput.ResourceRelations))
maps.Copy(exporters, exporter.GenerateFileExporter(ctx, cfgInput.ResourceRelations))
if cfgInput.Debug {
maps.Copy(exporters, exporter.GenerateDebugExporters())
}

return exporters
}
Expand Down Expand Up @@ -122,7 +130,7 @@ func (cfgInput *OtelColConfigInput) generateExtensions() (map[string]any, []stri
}

for _, tenant := range cfgInput.Tenants {
if tenant.Spec.PersistenceConfig.EnableFileStorage {
if !cfgInput.DryRunMode && tenant.Spec.PersistenceConfig.EnableFileStorage {
extensions[fmt.Sprintf("file_storage/%s", tenant.Name)] = storage.GenerateFileStorageExtensionForTenant(tenant.Spec.PersistenceConfig.Directory, tenant.Name)
}
}
Expand All @@ -149,7 +157,7 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any {
}); tenantIdx != -1 {
namespaces := cfgInput.Tenants[tenantIdx].Status.LogSourceNamespaces
if len(namespaces) > 0 || cfgInput.Tenants[tenantIdx].Spec.SelectFromAllNamespaces {
receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces, cfgInput.Tenants[tenantIdx])
receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces, cfgInput.DryRunMode, cfgInput.Tenants[tenantIdx])
}
}
}
Expand All @@ -159,8 +167,11 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any {

func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any {
connectors := make(map[string]any)
maps.Copy(connectors, connector.GenerateCountConnectors())
maps.Copy(connectors, connector.GenerateBytesConnectors())

if !cfgInput.DryRunMode {
maps.Copy(connectors, connector.GenerateCountConnectors())
maps.Copy(connectors, connector.GenerateBytesConnectors())
}

for _, tenant := range cfgInput.Tenants {
// Generate routing connector for the tenant's subscription if it has any
Expand Down Expand Up @@ -193,16 +204,18 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
namedPipelines := make(map[string]*otelv1beta1.Pipeline)
tenants := []string{}
for tenant := range cfgInput.TenantSubscriptionMap {
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant, cfgInput.DryRunMode)
tenants = append(tenants, tenant)
}

maps.Copy(namedPipelines, pipeline.GenerateMetricsPipelines())
if !cfgInput.DryRunMode {
maps.Copy(namedPipelines, pipeline.GenerateMetricsPipelines())
}

for _, tenant := range tenants {
// Generate a pipeline for the tenant
tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant)
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant, cfgInput.DryRunMode)

connector.GenerateRoutingConnectorForBridgesTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges)
processor.GenerateTransformProcessorForTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Tenants)
Expand Down Expand Up @@ -234,24 +247,25 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b

var exporters []string

if output.Output.Spec.OTLPGRPC != nil {
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
}

if output.Output.Spec.OTLPHTTP != nil {
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
}

if output.Output.Spec.Fluentforward != nil {
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
}

if output.Output.Spec.File != nil {
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
}

if cfgInput.Debug {
exporters = append(exporters, "debug")
// If in dry-run mode, only generate debug exporters
if cfgInput.DryRunMode {
exporters = []string{exporter.DebugExporterID}
} else {
if cfgInput.Debug {
exporters = append(exporters, exporter.DebugExporterID)
}
if output.Output.Spec.OTLPGRPC != nil {
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
}
if output.Output.Spec.OTLPHTTP != nil {
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
}
if output.Output.Spec.Fluentforward != nil {
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
}
if output.Output.Spec.File != nil {
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
}
}

namedPipelines[outputPipelineName] = pipeline.GeneratePipeline(receivers, processors, exporters)
Expand Down
3 changes: 2 additions & 1 deletion pkg/resources/otel_conf_gen/otel_conf_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) {
{
name: "Single tenant with no subscriptions",
cfgInput: OtelColConfigInput{
DryRunMode: false,
ResourceRelations: components.ResourceRelations{
Bridges: nil,
OutputsWithSecretData: nil,
Expand All @@ -486,7 +487,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) {
},
},
expectedPipelines: map[string]*otelv1beta1.Pipeline{
"logs/tenant_tenant1": pipeline.GenerateRootPipeline([]v1alpha1.Tenant{}, "tenant1"),
"logs/tenant_tenant1": pipeline.GenerateRootPipeline([]v1alpha1.Tenant{}, "tenant1", false),
"logs/tenant_tenant1_subscription_ns1_sub1": pipeline.GeneratePipeline(
[]string{"routing/tenant_tenant1_subscriptions"},
[]string{"attributes/subscription_sub1"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package exporter

const DebugExporterID = "debug"

func GenerateDebugExporters() map[string]any {
result := make(map[string]any)
result["debug"] = map[string]any{
result[DebugExporterID] = map[string]any{
"verbosity": "detailed",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
)

const (
DefaultPrometheusExporterID = "prometheus/message_metrics_exporter"
)

type TLSServerConfig struct {
// squash ensures fields are correctly decoded in embedded struct.
v1alpha1.TLSSetting `json:",inline"`
Expand Down Expand Up @@ -125,7 +129,7 @@ func GenerateMetricsExporters() map[string]any {
}

metricsExporters := make(map[string]any)
metricsExporters["prometheus/message_metrics_exporter"] = defaultPrometheusExporterConfig
metricsExporters[DefaultPrometheusExporterID] = defaultPrometheusExporterConfig

return metricsExporters
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func GenerateOutputExporterNameProcessor(outputName string) AttributesProcessor

func GenerateMetricsProcessors() map[string]any {
metricsProcessors := make(map[string]any)
metricsProcessors["deltatocumulative"] = DeltaToCumulativeConfig{}
metricsProcessors[DefaultDeltaToCumulativeProcessorID] = DeltaToCumulativeConfig{}
metricsProcessors["attributes/metricattributes"] = AttributesProcessor{
Actions: []AttributesProcessorAction{
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package processor

import "time"

const DefaultDeltaToCumulativeProcessorID = "deltatocumulative"

type DeltaToCumulativeConfig struct {
MaxStale time.Duration `json:"max_stale,omitempty"`
MaxStreams int `json:"max_streams,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
)

func GenerateDefaultKubernetesReceiver(namespaces []string, tenant v1alpha1.Tenant) map[string]any {
func GenerateDefaultKubernetesReceiver(namespaces []string, dryRunMode bool, tenant v1alpha1.Tenant) map[string]any {
// TODO: fix parser-crio
operators := []map[string]any{
{
Expand Down Expand Up @@ -114,7 +114,7 @@ func GenerateDefaultKubernetesReceiver(namespaces []string, tenant v1alpha1.Tena
"max_elapsed_time": 0,
},
}
if tenant.Spec.PersistenceConfig.EnableFileStorage {
if !dryRunMode && tenant.Spec.PersistenceConfig.EnableFileStorage {
k8sReceiver["storage"] = fmt.Sprintf("file_storage/%s", tenant.Name)
}

Expand Down
Loading
Loading