Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions pkg/api/conn_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import "time"

type ConnTrack struct {
// TODO: should by a pointer instead?
KeyDefinition KeyDefinition `yaml:"keyDefinition" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"`
EndConnectionTimeout time.Duration `yaml:"endConnectionTimeout" doc:"duration of time to wait from the last flow log to end a connection"`
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
EndConnectionTimeout time.Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
}

type ConnTrackOutputRecordTypeEnum struct {
Expand All @@ -38,12 +38,12 @@ func ConnTrackOutputRecordTypeName(operation string) string {
}

type KeyDefinition struct {
FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"`
FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash,omitempty" doc:"how to build the connection hash"`
}

type FieldGroup struct {
Name string `yaml:"name" doc:"field group name"`
Name string `yaml:"name,omitempty" doc:"field group name"`
Fields []string `yaml:"fields" doc:"list of fields in the group"`
}

Expand All @@ -54,16 +54,16 @@ type FieldGroup struct {
// When they are not set, a different hash will be computed for A->B and B->A,
// and they are tracked as different connections.
type ConnTrackHash struct {
FieldGroupRefs []string `yaml:"fieldGroupRefs" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef" doc:"field group name of endpoint B"`
FieldGroupRefs []string `yaml:"fieldGroupRefs,omitempty" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef,omitempty" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef,omitempty" doc:"field group name of endpoint B"`
}

type OutputField struct {
Name string `yaml:"name" doc:"output field name"`
Operation string `yaml:"operation" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input" doc:"The input field to base the operation on. When omitted, 'name' is used"`
Name string `yaml:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
}

type ConnTrackOperationEnum struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/decode_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
package api

type DecodeAws struct {
Fields []string `yaml:"fields" json:"fields" doc:"list of aws flow log fields"`
Fields []string `yaml:"fields,omitempty" json:"fields" doc:"list of aws flow log fields"`
}
10 changes: 5 additions & 5 deletions pkg/api/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package api
type EncodeKafka struct {
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
Balancer string `yaml:"balancer" json:"balancer" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
WriteTimeout int64 `yaml:"writeTimeout" json:"writeTimeout" doc:"timeout (in seconds) for write operation performed by the Writer"`
ReadTimeout int64 `yaml:"readTimeout" json:"readTimeout" doc:"timeout (in seconds) for read operation performed by the Writer"`
BatchBytes int64 `yaml:"batchBytes" json:"batchBytes" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
BatchSize int `yaml:"batchSize" json:"batchSize" doc:"limit on how many messages will be buffered before being sent to a partition"`
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
}

type KafkaEncodeBalancerEnum struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package api

type PromEncode struct {
Metrics PromMetricsItems `yaml:"metrics" json:"metrics" doc:"list of prometheus metric definitions, each includes:"`
Port int `yaml:"port" json:"port" doc:"port number to expose \"/metrics\" endpoint"`
Prefix string `yaml:"prefix" json:"prefix" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime" json:"expiryTime" doc:"seconds of no-flow to wait before deleting prometheus data item"`
Metrics PromMetricsItems `yaml:"metrics" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Port int `yaml:"port" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"`
Prefix string `yaml:"prefix" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
}

type PromEncodeOperationEnum struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/extract_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ type AggregateBy []string
type AggregateOperation string

type AggregateDefinition struct {
Name string `yaml:"name" json:"name" doc:"description of aggregation result"`
By AggregateBy `yaml:"by" json:"by" doc:"list of fields on which to aggregate"`
Operation AggregateOperation `yaml:"operation" json:"operation" doc:"sum, min, max, avg or raw_values"`
RecordKey string `yaml:"recordKey" json:"recordKey" doc:"internal field on which to perform the operation"`
TopK int `yaml:"topK" json:"topK" doc:"number of highest incidence to report (default - report all)"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of aggregation result"`
By AggregateBy `yaml:"by,omitempty" json:"by,omitempty" doc:"list of fields on which to aggregate"`
Operation AggregateOperation `yaml:"operation,omitempty" json:"operation,omitempty" doc:"sum, min, max, avg or raw_values"`
RecordKey string `yaml:"recordKey,omitempty" json:"recordKey,omitempty" doc:"internal field on which to perform the operation"`
TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"`
}
8 changes: 4 additions & 4 deletions pkg/api/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package api

type IngestCollector struct {
HostName string `yaml:"hostName" json:"hostName" doc:"the hostname to listen on"`
Port int `yaml:"port" json:"port" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"`
PortLegacy int `yaml:"portLegacy" json:"portLegacy" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
BatchMaxLen int `yaml:"batchMaxLen" json:"batchMaxLen" doc:"the number of accumulated flows before being forwarded for processing"`
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"`
PortLegacy int `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
}
4 changes: 2 additions & 2 deletions pkg/api/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package api

type IngestGRPCProto struct {
Port int `yaml:"port" json:"port" doc:"the port number to listen on"`
BufferLen int `yaml:"bufferLength" json:"bufferLength" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
}
12 changes: 6 additions & 6 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package api

type IngestKafka struct {
Brokers []string `yaml:"brokers" json:"brokers" doc:"list of kafka broker addresses"`
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to listen on"`
GroupId string `yaml:"groupid" json:"groupid" doc:"separate groupid for each consumer on specified topic"`
GroupBalancers []string `yaml:"groupBalancers" json:"groupBalancers" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset" json:"startOffset" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout" json:"batchReadTimeout" doc:"how often (in milliseconds) to process input"`
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
}
6 changes: 3 additions & 3 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package api

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules" json:"rules" doc:"list of filter rules, each includes:"`
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

type TransformFilterOperationEnum struct {
Expand All @@ -32,6 +32,6 @@ func TransformFilterOperationName(operation string) string {
}

type TransformFilterRule struct {
Input string `yaml:"input" json:"input" doc:"entry input field"`
Type string `yaml:"type" json:"type" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
}
8 changes: 4 additions & 4 deletions pkg/api/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package api

type TransformGeneric struct {
Policy string `yaml:"policy" json:"policy" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"`
Rules []GenericTransformRule `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
Policy string `yaml:"policy,omitempty" json:"policy,omitempty" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"`
Rules []GenericTransformRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of transform rules, each includes:"`
}

type TransformGenericOperationEnum struct {
Expand All @@ -32,8 +32,8 @@ func TransformGenericOperationName(operation string) string {
}

type GenericTransformRule struct {
Input string `yaml:"input" json:"input" doc:"entry input field"`
Output string `yaml:"output" json:"output" doc:"entry output field"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
}

type GenericTransform []GenericTransformRule
6 changes: 3 additions & 3 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func TransformNetworkOperationName(operation string) string {
}

type NetworkTransformRule struct {
Input string `yaml:"input" json:"input" doc:"entry input field"`
Output string `yaml:"output" json:"output" doc:"entry output field"`
Type string `yaml:"type" json:"type" enum:"TransformNetworkOperationEnum" doc:"one of the following:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"`
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/write_stdout.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package api

type WriteStdout struct {
Format string `yaml:"format" json:"format" doc:"the format of each line: printf (default) or json"`
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (default) or json"`
}
12 changes: 6 additions & 6 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestLokiPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999,"portLegacy":0,"batchMaxLen":0}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
Expand Down Expand Up @@ -136,19 +136,19 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","groupBalancers":null,"startOffset":"","batchReadTimeout":0}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group"}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.Equal(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"input":"doesnt_exist","type":"remove_entry_if_doesnt_exist"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.Equal(t, `{"name":"aggregate","extract":{"type":"aggregates","aggregates":[{"name":"src_as_connection_count","by":["srcAS"],"operation":"count","recordKey":"","topK":0}]}}`, string(b))
require.Equal(t, `{"name":"aggregate","extract":{"type":"aggregates","aggregates":[{"name":"src_as_connection_count","by":["srcAS"],"operation":"count"}]}}`, string(b))

b, err = json.Marshal(params[3])
require.NoError(t, err)
require.Equal(t, `{"name":"prom","encode":{"type":"prom","prom":{"metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"port":9090,"prefix":"flp_","expiryTime":0}}}`, string(b))
require.Equal(t, `{"name":"prom","encode":{"type":"prom","prom":{"metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"port":9090,"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand All @@ -167,13 +167,13 @@ func TestForkPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999,"portLegacy":0,"batchMaxLen":0}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.Equal(t, `{"name":"loki","write":{"type":"loki","loki":{"url":"http://loki:3100/","batchWait":"1s","batchSize":102400,"timeout":"10s","minBackoff":"1s","maxBackoff":"5m","maxRetries":10,"timestampLabel":"TimeReceived","timestampScale":"1s"}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.Equal(t, `{"name":"stdout","write":{"type":"stdout","stdout":{"format":""}}}`, string(b))
require.Equal(t, `{"name":"stdout","write":{"type":"stdout","stdout":{}}}`, string(b))
}