Skip to content

Commit

Permalink
Refactor FlushInterval
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin committed Nov 26, 2015
1 parent ee28f21 commit 471bdb2
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 58 deletions.
4 changes: 2 additions & 2 deletions outputs/elasticsearch/output.go
Expand Up @@ -87,8 +87,8 @@ func (out *elasticsearchOutput) init(
}

maxRetries := defaultMaxRetries
if config.Max_retries != nil {
maxRetries = *config.Max_retries
if config.MaxRetries != nil {
maxRetries = *config.MaxRetries
}
maxAttempts := maxRetries + 1 // maximum number of send attempts (-1 = infinite)
if maxRetries < 0 {
Expand Down
20 changes: 10 additions & 10 deletions outputs/elasticsearch/output_test.go
Expand Up @@ -23,16 +23,16 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc

var output elasticsearchOutput
output.init("packetbeat", outputs.MothershipConfig{
Save_topology: true,
Host: GetEsHost(),
Port: esPort,
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
Path: "",
Index: index,
Protocol: "http",
Flush_interval: &flushInterval,
BulkMaxSize: &bulkSize,
Save_topology: true,
Host: GetEsHost(),
Port: esPort,
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
Path: "",
Index: index,
Protocol: "http",
FlushInterval: &flushInterval,
BulkMaxSize: &bulkSize,
}, 10)

return output
Expand Down
10 changes: 7 additions & 3 deletions outputs/fileout/file.go
Expand Up @@ -38,22 +38,26 @@ func (out *fileOutput) init(beat string, config *outputs.MothershipConfig, topol
if out.rotator.Name == "" {
out.rotator.Name = beat
}
logp.Info("Logging Base Filename set to: %v", out.rotator.Name)

// disable bulk support
configDisableInt := -1
config.Flush_interval = &configDisableInt
config.FlushInterval = &configDisableInt
config.BulkMaxSize = &configDisableInt

rotateeverybytes := uint64(config.Rotate_every_kb) * 1024
rotateeverybytes := uint64(config.RotateEveryKb) * 1024
if rotateeverybytes == 0 {
rotateeverybytes = 10 * 1024 * 1024
}
logp.Info("Rotate every bytes set to: %v", rotateeverybytes)
out.rotator.RotateEveryBytes = &rotateeverybytes

keepfiles := config.Number_of_files
keepfiles := config.NumberOfFiles
if keepfiles == 0 {
keepfiles = 7
}
logp.Info("Number of files set to: %v", keepfiles)

out.rotator.KeepFiles = &keepfiles

err := out.rotator.CreateDirectory()
Expand Down
6 changes: 4 additions & 2 deletions outputs/logstash/logstash.go
Expand Up @@ -100,9 +100,11 @@ func (lj *logstash) init(
}

sendRetries := defaultSendRetries
if config.Max_retries != nil {
sendRetries = *config.Max_retries
if config.MaxRetries != nil {
sendRetries = *config.MaxRetries
}
logp.Info("Max Retries set to: %v", sendRetries)

maxAttempts := sendRetries + 1
if sendRetries < 0 {
maxAttempts = 0
Expand Down
12 changes: 6 additions & 6 deletions outputs/logstash/logstash_integration_test.go
Expand Up @@ -157,12 +157,12 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer {
flushInterval := 0
bulkSize := 0
config := outputs.MothershipConfig{
Hosts: []string{getElasticsearchHost()},
Index: index,
Flush_interval: &flushInterval,
BulkMaxSize: &bulkSize,
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
Hosts: []string{getElasticsearchHost()},
Index: index,
FlushInterval: &flushInterval,
BulkMaxSize: &bulkSize,
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
}

output, err := plugin.NewOutput("test", &config, 10)
Expand Down
12 changes: 6 additions & 6 deletions outputs/logstash/logstash_test.go
Expand Up @@ -344,9 +344,9 @@ func TestLogstashInvalidTLSInsecure(t *testing.T) {
CAs: []string{certName + ".pem"},
Insecure: true,
},
Timeout: 2,
Max_retries: &retries,
Hosts: []string{server.Addr()},
Timeout: 2,
MaxRetries: &retries,
Hosts: []string{server.Addr()},
}

testConnectionType(t, server, testOutputerFactory(t, "", &config))
Expand Down Expand Up @@ -431,9 +431,9 @@ func TestLogstashInvalidTLS(t *testing.T) {
TLS: &outputs.TLSConfig{
CAs: []string{certName + ".pem"},
},
Timeout: 1,
Max_retries: &retries,
Hosts: []string{server.Addr()},
Timeout: 1,
MaxRetries: &retries,
Hosts: []string{server.Addr()},
}

var result struct {
Expand Down
48 changes: 24 additions & 24 deletions outputs/outputs.go
Expand Up @@ -8,30 +8,30 @@ import (
)

type MothershipConfig struct {
Save_topology bool
Host string
Port int
Hosts []string
LoadBalance *bool
Protocol string
Username string
Password string
Index string
Path string
Db int
Db_topology int
Timeout int
Reconnect_interval int
Filename string
Rotate_every_kb int
Number_of_files int
DataType string
Flush_interval *int
BulkMaxSize *int `yaml:"bulk_max_size"`
Max_retries *int
Pretty *bool
TLS *TLSConfig
Worker int
Save_topology bool
Host string
Port int
Hosts []string
LoadBalance *bool
Protocol string
Username string
Password string
Index string
Path string
Db int
Db_topology int
Timeout int
ReconnectInterval int `yaml:"reconnect_interval"`
Filename string `yaml:"filename"`
RotateEveryKb int `yaml:"rotate_every_kb"`
NumberOfFiles int `yaml:"number_of_files"`
DataType string
FlushInterval *int `yaml:"flush_interval"`
BulkMaxSize *int `yaml:"bulk_max_size"`
MaxRetries *int `yaml:"max_retries"`
Pretty *bool `yaml:"pretty"`
TLS *TLSConfig
Worker int
}

type Outputer interface {
Expand Down
5 changes: 3 additions & 2 deletions outputs/redis/redis.go
Expand Up @@ -99,9 +99,10 @@ func (out *redisOutput) Init(beat string, config outputs.MothershipConfig, topol
}

out.ReconnectInterval = time.Duration(1) * time.Second
if config.Reconnect_interval != 0 {
out.ReconnectInterval = time.Duration(config.Reconnect_interval) * time.Second
if config.ReconnectInterval != 0 {
out.ReconnectInterval = time.Duration(config.ReconnectInterval) * time.Second
}
logp.Info("Reconnect Interval set to: %v", out.ReconnectInterval)

expSec := 15
if topology_expire != 0 {
Expand Down
6 changes: 4 additions & 2 deletions publisher/async.go
Expand Up @@ -77,14 +77,16 @@ func asyncOutputer(ws *workerSignal, worker *outputWorker) worker {
config := worker.config

flushInterval := defaultFlushInterval
if config.Flush_interval != nil {
flushInterval = time.Duration(*config.Flush_interval) * time.Millisecond
if config.FlushInterval != nil {
flushInterval = time.Duration(*config.FlushInterval) * time.Millisecond
}
logp.Info("Flush Interval set to: %v", flushInterval)

maxBulkSize := defaultBulkSize
if config.BulkMaxSize != nil {
maxBulkSize = *config.BulkMaxSize
}
logp.Info("Max Bulk Size set to: %v", maxBulkSize)

// batching disabled
if flushInterval <= 0 || maxBulkSize <= 0 {
Expand Down
2 changes: 1 addition & 1 deletion publisher/publish.go
Expand Up @@ -195,7 +195,7 @@ func (publisher *PublisherType) Init(
output := plugin.Output
config := plugin.Config

debug("create output worker: %p, %p", config.Flush_interval, config.BulkMaxSize)
debug("Create output worker")

outputers = append(outputers,
newOutputWorker(config, output, &publisher.wsOutput, 1000))
Expand Down

0 comments on commit 471bdb2

Please sign in to comment.