Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 259 additions & 4 deletions internal/generator/vector/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
var _ = Describe("Testing Complete Config Generation", func() {
var f = func(testcase generator.ConfGenerateTest) {
g := generator.MakeGenerator()
e := generator.MergeSections(Conf(&testcase.CLSpec, testcase.Secrets, &testcase.CLFSpec, generator.NoOptions))
if testcase.Options == nil {
testcase.Options = generator.Options{}
}
e := generator.MergeSections(Conf(&testcase.CLSpec, testcase.Secrets, &testcase.CLFSpec, testcase.Options))
conf, err := g.GenerateConf(e...)
Expect(err).To(BeNil())
diff := cmp.Diff(
Expand Down Expand Up @@ -389,7 +392,7 @@ source = """


# Adding _id field
[transforms.elasticsearch_preprocess]
[transforms.elasticsearch_preprocess_1]
type = "remap"
inputs = ["pipeline"]
source = """
Expand All @@ -409,7 +412,7 @@ if (.log_type == "audit"){

[sinks.es_1]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess"]
inputs = ["elasticsearch_preprocess_1"]
endpoint = "https://es-1.svc.messaging.cluster.local:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
Expand All @@ -422,7 +425,259 @@ crt_file = "/var/run/ocp-collector/secrets/es-1/tls.crt"
ca_file = "/var/run/ocp-collector/secrets/es-1/ca-bundle.crt"
[sinks.es_2]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess"]
inputs = ["elasticsearch_preprocess_1"]
endpoint = "https://es-2.svc.messaging.cluster.local:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
bulk_action = "create"
id_key = "_id"
# TLS Config
[sinks.es_2.tls]
key_file = "/var/run/ocp-collector/secrets/es-2/tls.key"
crt_file = "/var/run/ocp-collector/secrets/es-2/tls.crt"
ca_file = "/var/run/ocp-collector/secrets/es-2/ca-bundle.crt"
`,
}),
Entry("with multiple pipelines for elastic-search", generator.ConfGenerateTest{
CLSpec: logging.ClusterLoggingSpec{
Forwarder: &logging.ForwarderSpec{},
},
CLFSpec: logging.ClusterLogForwarderSpec{
Pipelines: []logging.PipelineSpec{
{
InputRefs: []string{
logging.InputNameApplication,
logging.InputNameInfrastructure,
},
OutputRefs: []string{"es-1"},
Name: "pipeline1",
},
{
InputRefs: []string{
logging.InputNameAudit},
OutputRefs: []string{"es-1", "es-2"},
Name: "pipeline2",
},
},
Outputs: []logging.OutputSpec{
{
Type: logging.OutputTypeElasticsearch,
Name: "es-1",
URL: "https://es-1.svc.messaging.cluster.local:9200",
Secret: &logging.OutputSecretSpec{
Name: "es-1",
},
},
{
Type: logging.OutputTypeElasticsearch,
Name: "es-2",
URL: "https://es-2.svc.messaging.cluster.local:9200",
Secret: &logging.OutputSecretSpec{
Name: "es-2",
},
},
},
},
Secrets: map[string]*corev1.Secret{
"es-1": {
Data: map[string][]byte{
"tls.key": []byte("junk"),
"tls.crt": []byte("junk"),
"ca-bundle.crt": []byte("junk"),
},
},
"es-2": {
Data: map[string][]byte{
"tls.key": []byte("junk"),
"tls.crt": []byte("junk"),
"ca-bundle.crt": []byte("junk"),
},
},
},
ExpectedConf: `
# Logs from containers (including openshift containers)
[sources.raw_container_logs]
type = "kubernetes_logs"
auto_partial_merge = true
exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"]

[sources.raw_journal_logs]
type = "journald"

# Logs from host audit
[sources.host_audit_logs]
type = "file"
ignore_older_secs = 600
include = ["/var/log/audit/audit.log"]

# Logs from kubernetes audit
[sources.k8s_audit_logs]
type = "file"
ignore_older_secs = 600
include = ["/var/log/kube-apiserver/audit.log"]

# Logs from openshift audit
[sources.openshift_audit_logs]
type = "file"
ignore_older_secs = 600
include = ["/var/log/oauth-apiserver.audit.log"]

[transforms.container_logs]
type = "remap"
inputs = ["raw_container_logs"]
source = '''
level = "unknown"
if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){
level = "warn"
} else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){
level = "info"
} else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){
level = "error"
} else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){
level = "debug"
}
.level = level

.pipeline_metadata.collector.name = "vector"
.pipeline_metadata.collector.version = "0.14.1"
ip4, err = get_env_var("NODE_IPV4")
.pipeline_metadata.collector.ipaddr4 = ip4
received, err = format_timestamp(now(),"%+")
.pipeline_metadata.collector.received_at = received
.pipeline_metadata.collector.error = err
'''

[transforms.journal_logs]
type = "remap"
inputs = ["raw_journal_logs"]
source = '''
level = "unknown"
if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){
level = "warn"
} else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){
level = "info"
} else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){
level = "error"
} else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){
level = "debug"
}
.level = level

.pipeline_metadata.collector.name = "vector"
.pipeline_metadata.collector.version = "0.14.1"
ip4, err = get_env_var("NODE_IPV4")
.pipeline_metadata.collector.ipaddr4 = ip4
received, err = format_timestamp(now(),"%+")
.pipeline_metadata.collector.received_at = received
.pipeline_metadata.collector.error = err
'''


[transforms.route_container_logs]
type = "route"
inputs = ["container_logs"]
route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")'
route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'


# Rename log stream to "application"
[transforms.application]
type = "remap"
inputs = ["route_container_logs.app"]
source = """
.log_type = "application"
"""


# Rename log stream to "infrastructure"
[transforms.infrastructure]
type = "remap"
inputs = ["route_container_logs.infra","journal_logs"]
source = """
.log_type = "infrastructure"
"""


# Rename log stream to "audit"
[transforms.audit]
type = "remap"
inputs = ["host_audit_logs","k8s_audit_logs","openshift_audit_logs"]
source = """
.log_type = "audit"
"""


[transforms.pipeline1]
type = "remap"
inputs = ["application","infrastructure"]
source = """
.
"""


[transforms.pipeline2]
type = "remap"
inputs = ["audit"]
source = """
.
"""


# Adding _id field
[transforms.elasticsearch_preprocess_1]
type = "remap"
inputs = ["pipeline1","pipeline2"]
source = """
index = "default"
if (.log_type == "application"){
index = "app"
}
if (.log_type == "infrastructure"){
index = "infra"
}
if (.log_type == "audit"){
index = "audit"
}
."write-index"=index+"-write"
._id = encode_base64(uuid_v4())
"""

[sinks.es_1]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess_1"]
endpoint = "https://es-1.svc.messaging.cluster.local:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
bulk_action = "create"
id_key = "_id"
# TLS Config
[sinks.es_1.tls]
key_file = "/var/run/ocp-collector/secrets/es-1/tls.key"
crt_file = "/var/run/ocp-collector/secrets/es-1/tls.crt"
ca_file = "/var/run/ocp-collector/secrets/es-1/ca-bundle.crt"

# Adding _id field
[transforms.elasticsearch_preprocess_2]
type = "remap"
inputs = ["pipeline2"]
source = """
index = "default"
if (.log_type == "application"){
index = "app"
}
if (.log_type == "infrastructure"){
index = "infra"
}
if (.log_type == "audit"){
index = "audit"
}
."write-index"=index+"-write"
._id = encode_base64(uuid_v4())
"""

[sinks.es_2]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess_2"]
endpoint = "https://es-2.svc.messaging.cluster.local:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
Expand Down
12 changes: 8 additions & 4 deletions internal/generator/vector/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package elasticsearch

import (
"fmt"
"strings"

logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
Expand Down Expand Up @@ -44,13 +45,15 @@ id_key = "_id"
func Conf(o logging.OutputSpec, inputs []string, secret *corev1.Secret, op Options) []Element {

outputs := []Element{}
id_key := "elasticsearch_preprocess"
id_key := strings.Join(inputs, "_")
id := fmt.Sprint("elasticsearch_preprocess_", len(op)+1)

if _, exists := op[id_key]; !exists {
outputs = MergeElements(outputs,
[]Element{
Remap{
Desc: "Adding _id field",
ComponentID: id_key,
ComponentID: id,
Inputs: helpers.MakeInputs(inputs...),
VRL: strings.TrimSpace(`
index = "default"
Expand All @@ -68,12 +71,13 @@ if (.log_type == "audit"){
`),
},
})
op[id_key] = true
op[id_key] = id
}

id = fmt.Sprintf("%v", op[id_key])
outputs = MergeElements(outputs,
[]Element{
Output(o, []string{id_key}, secret, op),
Output(o, []string{id}, secret, op),
},
TLSConf(o, secret),
BasicAuth(o, secret),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var _ = Describe("Generate Vector config", func() {
},
ExpectedConf: `
# Adding _id field
[transforms.elasticsearch_preprocess]
[transforms.elasticsearch_preprocess_1]
type = "remap"
inputs = ["application"]
source = """
Expand All @@ -62,7 +62,7 @@ if (.log_type == "audit"){

[sinks.es_1]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess"]
inputs = ["elasticsearch_preprocess_1"]
endpoint = "https://es.svc.infra.cluster:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
Expand Down Expand Up @@ -99,7 +99,7 @@ password = "testpass"
},
ExpectedConf: `
# Adding _id field
[transforms.elasticsearch_preprocess]
[transforms.elasticsearch_preprocess_1]
type = "remap"
inputs = ["application"]
source = """
Expand All @@ -119,7 +119,7 @@ if (.log_type == "audit"){

[sinks.es_1]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess"]
inputs = ["elasticsearch_preprocess_1"]
endpoint = "https://es.svc.infra.cluster:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
Expand All @@ -146,7 +146,7 @@ ca_file = "/var/run/ocp-collector/secrets/es-1/ca-bundle.crt"
Secrets: security.NoSecrets,
ExpectedConf: `
# Adding _id field
[transforms.elasticsearch_preprocess]
[transforms.elasticsearch_preprocess_1]
type = "remap"
inputs = ["application"]
source = """
Expand All @@ -166,7 +166,7 @@ if (.log_type == "audit"){

[sinks.es_1]
type = "elasticsearch"
inputs = ["elasticsearch_preprocess"]
inputs = ["elasticsearch_preprocess_1"]
endpoint = "http://es.svc.infra.cluster:9200"
index = "{{ write-index }}"
request.timeout_secs = 2147483648
Expand Down