Skip to content

Commit

Permalink
Merge pull request #118 from kaasops/merge-syncs
Browse files Browse the repository at this point in the history
Merge sinks
  • Loading branch information
dkhachyan committed Jun 21, 2023
2 parents 4d8acab + b1b8a93 commit 03c103e
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 12 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/vector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type VectorSpec struct {
// Merge kubernetes sources and move selectors processing to transforms.
// +optional
MergeKubernetesSources bool `json:"mergeKubernetesSources,omitempty"`
// Merge kubernetes sink with equal options.
// +optional
MergeSinks bool `json:"mergeSinks,omitempty"`

// Vector Aggregator
// Aggregator *VectorAggregator `json:"aggregator,omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/observability.kaasops.io_vectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4144,6 +4144,9 @@ spec:
description: Merge kubernetes sources and move selectors processing
to transforms.
type: boolean
mergeSinks:
description: Merge kubernetes sink with equal options.
type: boolean
type: object
status:
description: VectorStatus defines the observed state of Vector
Expand Down
38 changes: 38 additions & 0 deletions controllers/factory/config/config_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
"sort"
"strings"

vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1"
"github.com/kaasops/vector-operator/controllers/factory/pipeline"
"github.com/kaasops/vector-operator/controllers/factory/utils/hash"
"github.com/kaasops/vector-operator/controllers/factory/utils/k8s"
"github.com/kaasops/vector-operator/controllers/factory/vector/vectoragent"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -131,6 +133,12 @@ func (b *Builder) generateVectorConfig() (*VectorConfig, error) {
}
}

if b.vaCtrl.Vector.Spec.MergeSinks {
if err := b.mergeSyncs(vectorConfig); err != nil {
return nil, err
}
}

return vectorConfig, nil
}

Expand Down Expand Up @@ -269,6 +277,11 @@ func getSinks(pipeline pipeline.Pipeline) ([]*Sink, error) {
for i, inputName := range sink.Inputs {
sink.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName)
}
optbyte, err := json.Marshal(sink.Options)
if err != nil {
return nil, err
}
sink.OptionsHash = fmt.Sprint(hash.Get(optbyte))
sinks = append(sinks, sink)
}
return sinks, nil
Expand Down Expand Up @@ -373,6 +386,31 @@ func (b *Builder) mergeKubernetesSources(config *VectorConfig) error {
return nil
}

func (b *Builder) mergeSyncs(config *VectorConfig) error {
uniqOpts := make(map[string]*Sink)
var mergedSinks []*Sink

for _, sink := range config.Sinks {
v, ok := uniqOpts[sink.OptionsHash]
if ok {
if sink.Type == v.Type {
// If sink spec already exists rename and merge inputs
v.Name = v.OptionsHash
v.Inputs = append(v.Inputs, sink.Inputs...)
sort.Strings(v.Inputs)
continue
}
}
uniqOpts[sink.OptionsHash] = sink
mergedSinks = append(mergedSinks, sink)
}

if len(mergedSinks) > 0 {
config.Sinks = mergedSinks
}
return nil
}

func generateVrlFilter(selector string, selectorType string) string {
buffer := new(bytes.Buffer)

Expand Down
9 changes: 1 addition & 8 deletions controllers/factory/config/configcheck/configcheck_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,7 @@ func (cc *ConfigCheck) createVectorConfigCheckPod() *corev1.Pod {
Args: []string{"validate", "/etc/vector/*.json"},
Env: cc.generateVectorConfigCheckEnvs(),
SecurityContext: cc.ContainerSecurityContext,
Ports: []corev1.ContainerPort{
{
Name: "prom-exporter",
ContainerPort: 9090,
Protocol: "TCP",
},
},
VolumeMounts: cc.generateVectorConfigCheckVolumeMounts(),
VolumeMounts: cc.generateVectorConfigCheckVolumeMounts(),
},
},
RestartPolicy: "Never",
Expand Down
9 changes: 5 additions & 4 deletions controllers/factory/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ type Transform struct {
}

type Sink struct {
Name string
Type string `mapper:"type"`
Inputs []string `mapper:"inputs"`
Options map[string]interface{} `mapstructure:",remain"`
Name string
Type string `mapper:"type"`
Inputs []string `mapper:"inputs"`
Options map[string]interface{} `mapstructure:",remain"`
OptionsHash string
}

type ConfigComponent interface {
Expand Down
1 change: 1 addition & 0 deletions helm/charts/vector-operator/templates/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ metadata:
namespace: {{ .Release.Namespace }}
spec:
mergeKubernetesSources: {{ .Values.vector.mergeKubernetesSources}}
mergeSinks: {{ .Values.vector.mergeSinks}}
{{- with .Values.vector.agent }}
agent:
{{ toYaml . | indent 4 }}
Expand Down
1 change: 1 addition & 0 deletions helm/charts/vector-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ vector:
enable: false
name: "vector"
mergeKubernetesSources: false
mergeSinks: false
# agent:
# image: timberio/vector:0.24.0-distroless-libc
# env:
Expand Down

0 comments on commit 03c103e

Please sign in to comment.