diff --git a/config/file_send.conf b/config/file_send.conf index 80c05950..2093bd0b 100644 --- a/config/file_send.conf +++ b/config/file_send.conf @@ -4,12 +4,10 @@ "StdIn": Type: "consumer.Console" Streams: "console" - Fuse: "file" "FileOut": Type: "producer.File" Streams: "console" - Fuse: "file" File: /tmp/gollum_test.log Batch: TimeoutSec: 1 diff --git a/config/http_send.conf b/config/http_send.conf index 0de6b040..d20078f0 100644 --- a/config/http_send.conf +++ b/config/http_send.conf @@ -1,11 +1,9 @@ "StdIn": Type: "consumer.Console" Streams: "console" - Fuse: "http" "HttpOut": Type: "producer.HTTPRequest" Streams: "console" - Fuse: "http" Address: "http://localhost:9090" RawData: false diff --git a/config/socket_forward.conf b/config/socket_forward.conf index 075e21e7..5c89f5c6 100644 --- a/config/socket_forward.conf +++ b/config/socket_forward.conf @@ -1,7 +1,6 @@ "SocketIn": Type: "consumer.Socket" Streams: "forward" - Fuse: "socket" Address: "127.0.0.1:5880" Acknowledge: "OK" Partitioner: "ascii" @@ -10,7 +9,6 @@ "SocketOut": Type: "producer.Socket" Streams: "forward" - Fuse: "socket" Address: "unix://test/test.socket" Formatters: - "format.Runlength" diff --git a/config/socket_send.conf b/config/socket_send.conf index 36ebc4e4..a34ab2b3 100644 --- a/config/socket_send.conf +++ b/config/socket_send.conf @@ -1,7 +1,6 @@ "ReadStdIn": Type: "consumer.Console" Streams: "console" - Fuse: "socket" "AddRunLength": Type: "router.Broadcast" @@ -18,7 +17,6 @@ "ToSocket": Type: "producer.Socket" Streams: "console" - Fuse: "socket" Address: "unix://test/test.socket" ConnectionBufferSizeKB: 128 BatchTimeoutSec: 1 diff --git a/consumer/console.go b/consumer/console.go index 56638586..4f7becf7 100644 --- a/consumer/console.go +++ b/consumer/console.go @@ -32,9 +32,10 @@ const ( ) // Console consumer plugin +// // This consumer reads from stdin. A message is generated after each newline -// character. When attached to a fuse, this consumer will stop accepting -// messages in case that fuse is burned. +// character. +// // Configuration example // // - "consumer.Console": @@ -102,7 +103,6 @@ func (cons *Console) readPipe() { buffer := tio.NewBufferedReader(consoleBufferGrowSize, 0, 0, "\n") for cons.IsActive() { err := buffer.ReadAll(cons.pipe, cons.Enqueue) - cons.WaitOnFuse() switch err { case io.EOF: if cons.autoexit { diff --git a/consumer/file.go b/consumer/file.go index a8fc56df..8c81165f 100644 --- a/consumer/file.go +++ b/consumer/file.go @@ -45,13 +45,13 @@ const ( ) // File consumer plugin +// // The file consumer allows to read from files while looking for a delimiter // that marks the end of a message. If the file is part of e.g. a log rotation // the file consumer can be set to a symbolic link of the latest file and // (optionally) be told to reopen the file by sending a SIGHUP. A symlink to // a file will automatically be reopened if the underlying file is changed. -// When attached to a fuse, this consumer will stop accepting messages in case -// that fuse is burned. +// // Configuration example // // - "consumer.File": @@ -227,7 +227,6 @@ func (cons *File) read() { // Try to read from the file if cons.state == fileStateRead && cons.file != nil { err := buffer.ReadAll(cons.file, sendFunction) - cons.WaitOnFuse() switch { case err == nil: // ok diff --git a/consumer/http.go b/consumer/http.go index 8c2249f9..b1a15540 100644 --- a/consumer/http.go +++ b/consumer/http.go @@ -28,10 +28,10 @@ import ( ) // Http consumer plugin +// // This consumer opens up an HTTP 1.1 server and processes the contents of any // incoming HTTP request. -// When attached to a fuse, this consumer will return error 503 in case that -// fuse is burned. +// // Configuration example // // - "consumer.Http": @@ -136,10 +136,6 @@ func (cons *Http) requestHandler(resp http.ResponseWriter, req *http.Request) { return } } - if cons.IsFuseBurned() { - resp.WriteHeader(http.StatusServiceUnavailable) - return // ### return, service is down ### - } if cons.withHeaders { // Read the whole package diff --git a/consumer/kafka.go b/consumer/kafka.go index 5b386b20..22175030 100644 --- a/consumer/kafka.go +++ b/consumer/kafka.go @@ -40,10 +40,10 @@ const ( ) // Kafka consumer plugin +// // Thes consumer reads data from a given kafka topic. It is based on the sarama // library so most settings are mapped to the settings from this library. -// When attached to a fuse, this consumer will stop processing messages in case -// that fuse is burned. +// // Configuration example // // - "consumer.Kafka": @@ -404,7 +404,6 @@ func (cons *Kafka) readFromGroup() { spin := tsync.NewSpinner(tsync.SpinPriorityLow) for !cons.groupClient.Closed() { - cons.WaitOnFuse() select { case event := <-consumer.Messages(): if cons.prependKey { @@ -458,7 +457,6 @@ func (cons *Kafka) readFromPartition(partitionID int32) { spin := tsync.NewSpinner(tsync.SpinPriorityLow) for !cons.client.Closed() { - cons.WaitOnFuse() select { case event := <-partCons.Messages(): @@ -512,7 +510,6 @@ func (cons *Kafka) readPartitions(partitions []int32) { spin := tsync.NewSpinner(tsync.SpinPriorityLow) for !cons.client.Closed() { for idx, consumer := range consumers { - cons.WaitOnFuse() partition := partitions[idx] select { diff --git a/consumer/kinesis.go b/consumer/kinesis.go index e0ff46ff..9b0e5f37 100644 --- a/consumer/kinesis.go +++ b/consumer/kinesis.go @@ -42,9 +42,9 @@ const ( ) // Kinesis consumer plugin +// // This consumer reads message from an AWS Kinesis router. -// When attached to a fuse, this consumer will stop processing messages in case -// that fuse is burned. +// // Configuration example // // - "consumer.Kinesis": @@ -256,7 +256,6 @@ func (cons *Kinesis) processShard(shardID string) { recordConfig := (*kinesis.GetRecordsInput)(nil) for cons.running { - cons.WaitOnFuse() if recordConfig == nil { recordConfig = cons.createShardIteratorConfig(shardID) } diff --git a/consumer/profiler.go b/consumer/profiler.go index 82a5b349..f3a37a07 100644 --- a/consumer/profiler.go +++ b/consumer/profiler.go @@ -27,11 +27,11 @@ import ( ) // Profiler consumer plugin +// // The profiler plugin generates Runs x Batches messages and send them to the // configured streams as fast as possible. This consumer can be used to profile // producers and/or configurations. -// When attached to a fuse, this consumer will stop processing messages in case -// that fuse is burned. +// // Configuration example // // - "consumer.Profile": @@ -183,7 +183,6 @@ func (cons *Profiler) profile() { start := time.Now() for i := 0; i < cons.profileRuns && cons.IsActive(); i++ { - cons.WaitOnFuse() template := cons.templates[rand.Intn(len(cons.templates))] messageCount++ diff --git a/consumer/proxy.go b/consumer/proxy.go index 3fd8ead8..d4c52a54 100644 --- a/consumer/proxy.go +++ b/consumer/proxy.go @@ -35,12 +35,12 @@ const ( ) // Proxy consumer plugin. +// // The proxy consumer reads messages directly as-is from a given socket. // Messages are extracted by standard message size algorithms (see Partitioner). // This consumer can be used with any compatible proxy producer to establish // a two-way communication. -// When attached to a fuse, this consumer will stop accepting new connections -// and close all existing connections in case that fuse is burned. +// // Configuration example // // - "consumer.Proxy": @@ -145,7 +145,6 @@ func (cons *Proxy) accept() { listener := cons.listen.(net.Listener) for cons.IsActive() { - cons.WaitOnFuse() client, err := listener.Accept() if err != nil { diff --git a/consumer/proxyclient.go b/consumer/proxyclient.go index 7064ae18..3928cb28 100644 --- a/consumer/proxyclient.go +++ b/consumer/proxyclient.go @@ -88,7 +88,7 @@ func (client *proxyClient) sendMessage(data []byte) { func (client *proxyClient) read() { buffer := tio.NewBufferedReader(proxyClientBufferGrowSize, client.proxy.flags, client.proxy.offset, client.proxy.delimiter) - for client.proxy.IsActive() && client.connected && !client.proxy.IsFuseBurned() { + for client.proxy.IsActive() && client.connected { err := buffer.ReadAll(client.conn, client.sendMessage) // Handle read errors diff --git a/consumer/socket.go b/consumer/socket.go index 2dc16634..ea55b96c 100644 --- a/consumer/socket.go +++ b/consumer/socket.go @@ -36,11 +36,10 @@ const ( ) // Socket consumer plugin +// // The socket consumer reads messages directly as-is from a given socket. // Messages are separated from the stream by using a specific partitioner method. -// When attached to a fuse, this consumer will stop accepting new connections -// (closing the socket) and close all existing connections in case that fuse is -// burned. +// // Configuration example // // - "consumer.Socket": @@ -211,7 +210,7 @@ func (cons *Socket) processConnection(conn net.Conn) { buffer := tio.NewBufferedReader(socketBufferGrowSize, cons.flags, cons.offset, cons.delimiter) - for cons.IsActive() && !cons.IsFuseBurned() { + for cons.IsActive() { conn.SetReadDeadline(time.Now().Add(cons.readTimeout)) err := buffer.ReadAll(conn, cons.Enqueue) if err == nil { @@ -257,9 +256,6 @@ func (cons *Socket) udpAccept() { addr, _ := net.ResolveUDPAddr(cons.protocol, cons.address) for cons.IsActive() { - // Prevent reconnection until fuse is active again - cons.WaitOnFuse() - // (re)open a tcp connection for cons.listen == nil { if listener, err := net.ListenUDP(cons.protocol, addr); err == nil { @@ -281,9 +277,6 @@ func (cons *Socket) tcpAccept() { defer cons.closeTCPConnection() for cons.IsActive() { - // Prevent reconnection until fuse is active again - cons.WaitOnFuse() - // (re)open a tcp connection for cons.listen == nil { listener, err := net.Listen(cons.protocol, cons.address) @@ -368,11 +361,9 @@ func (cons *Socket) Consume(workers *sync.WaitGroup) { if cons.protocol == "udp" { go tgo.WithRecoverShutdown(cons.udpAccept) - cons.SetFuseBurnedCallback(cons.closeConnection) defer cons.closeConnection() } else { go tgo.WithRecoverShutdown(cons.tcpAccept) - cons.SetFuseBurnedCallback(cons.closeTCPConnection) defer cons.closeTCPConnection() } diff --git a/consumer/syslogd.go b/consumer/syslogd.go index a23f36c2..9f5540e8 100644 --- a/consumer/syslogd.go +++ b/consumer/syslogd.go @@ -23,9 +23,9 @@ import ( ) // Syslogd consumer plugin +// // The syslogd consumer accepts messages from a syslogd compatible socket. -// When attached to a fuse, this consumer will stop the syslogd service in case -// that fuse is burned. +// // Configuration example // // - "consumer.Syslogd": @@ -143,9 +143,6 @@ func (cons *Syslogd) Consume(workers *sync.WaitGroup) { server.Boot() defer server.Kill() - cons.SetFuseBurnedCallback(func() { server.Kill() }) - cons.SetFuseActiveCallback(func() { server.Boot() }) cons.ControlLoop() - server.Wait() } diff --git a/contrib/native/kafka/kafkaproducer.go b/contrib/native/kafka/kafkaproducer.go index e3cb774f..697690ec 100644 --- a/contrib/native/kafka/kafkaproducer.go +++ b/contrib/native/kafka/kafkaproducer.go @@ -27,11 +27,13 @@ import ( ) // KafkaProducer librdkafka producer plugin +// // NOTICE: This producer is not included in standard builds. To enable it // you need to trigger a custom build with native plugins enabled. // The kafka producer writes messages to a kafka cluster. This producer is // backed by the native librdkafka (0.8.6) library so most settings relate -// to that library. This producer does not implement a fuse breaker. +// to that library. +// // Configuration example // // - "native.KafkaProducer": diff --git a/contrib/native/systemd/systemdconsumer.go b/contrib/native/systemd/systemdconsumer.go index 4a79fe3e..06c78633 100644 --- a/contrib/native/systemd/systemdconsumer.go +++ b/contrib/native/systemd/systemdconsumer.go @@ -28,11 +28,11 @@ import ( ) // SystemdConsumer consumer plugin +// // NOTICE: This producer is not included in standard builds. To enable it // you need to trigger a custom build with native plugins enabled. // The systemd consumer allows to read from the systemd journal. -// When attached to a fuse, this consumer will stop reading messages in case -// that fuse is burned. +// // Configuration example // // - "native.Systemd": @@ -152,7 +152,6 @@ func (cons *SystemdConsumer) close() { func (cons *SystemdConsumer) read() { for cons.IsActive() { - cons.WaitOnFuse() c, err := cons.journal.Next() if err != nil { diff --git a/core/bufferedproducer.go b/core/bufferedproducer.go index 93254b80..966343d5 100644 --- a/core/bufferedproducer.go +++ b/core/bufferedproducer.go @@ -33,8 +33,6 @@ import ( // Formatter: "format.Forward" // Filter: "filter.All" // DropToStream: "_DROPPED_" -// Fuse: "" -// FuseTimeoutSec: 5 // Router: // - "foo" // - "bar" @@ -76,14 +74,6 @@ import ( // formatting it has to define a separate filter as the producer decides if // and where to format. // -// Fuse defines the name of a fuse to burn if e.g. the producer encounteres a -// lost connection. Each producer defines its own fuse breaking logic if -// necessary / applyable. Disable fuse behavior for a producer by setting an -// empty name or a FuseTimeoutSec <= 0. By default this is set to "". -// -// FuseTimeoutSec defines the interval in seconds used to check if the fuse can -// be recovered. Note that automatic fuse recovery logic depends on each -// producer's implementation. By default this setting is set to 10. type BufferedProducer struct { SimpleProducer messages MessageQueue diff --git a/core/consumer_test.go b/core/consumer_test.go index a460280c..a2f60def 100644 --- a/core/consumer_test.go +++ b/core/consumer_test.go @@ -131,62 +131,3 @@ func TestConsumerControlLoop(t *testing.T) { time.Sleep(50 * time.Millisecond) expect.Equal(atomic.LoadInt32(roll), int32(1)) } - -func TestConsumerFuse(t *testing.T) { - expect := ttesting.NewExpect(t) - mockC := getMockConsumer() - - conf := NewPluginConfig("mockConsumer", "mockConsumer") - conf.Override("Fuse", "test") - - mockC.Configure(NewPluginConfigReader(&conf)) - expect.NotNil(mockC.fuse) - - burnedCallback := new(int32) - activeCallback := new(int32) - - mockC.SetFuseBurnedCallback(func() { atomic.StoreInt32(burnedCallback, 1) }) - mockC.SetFuseActiveCallback(func() { atomic.StoreInt32(activeCallback, 1) }) - - go mockC.ControlLoop() - - expect.False(mockC.fuse.IsBurned()) - expect.Equal(atomic.LoadInt32(burnedCallback), int32(0)) - expect.Equal(atomic.LoadInt32(activeCallback), int32(0)) - - // Check manual fuse trigger - - atomic.StoreInt32(burnedCallback, 0) - atomic.StoreInt32(activeCallback, 0) - - mockC.Control() <- PluginControlFuseBurn - time.Sleep(10 * time.Millisecond) - expect.Equal(atomic.LoadInt32(burnedCallback), int32(1)) - expect.Equal(atomic.LoadInt32(activeCallback), int32(0)) - - atomic.StoreInt32(burnedCallback, 0) - mockC.Control() <- PluginControlFuseActive - time.Sleep(10 * time.Millisecond) - expect.Equal(atomic.LoadInt32(burnedCallback), int32(0)) - expect.Equal(atomic.LoadInt32(activeCallback), int32(1)) - - // Check automatic burn callback - - atomic.StoreInt32(burnedCallback, 0) - atomic.StoreInt32(activeCallback, 0) - - mockC.fuse.Burn() - //time.Sleep(tsync.SpinTimeSuspend + 100*time.Millisecond) - time.Sleep(time.Second + 100*time.Millisecond) - - expect.Equal(atomic.LoadInt32(burnedCallback), int32(1)) - expect.Equal(atomic.LoadInt32(activeCallback), int32(0)) - - // Check automatic activate callback - - atomic.StoreInt32(burnedCallback, 0) - mockC.fuse.Activate() - time.Sleep(10 * time.Millisecond) - expect.Equal(atomic.LoadInt32(burnedCallback), int32(0)) - expect.Equal(atomic.LoadInt32(activeCallback), int32(1)) -} diff --git a/core/fuseregistry.go b/core/fuseregistry.go deleted file mode 100644 index 3104bb8f..00000000 --- a/core/fuseregistry.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2015-2016 trivago GmbH -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "github.com/trivago/tgo/tsync" - "sync" -) - -// fuseRegistry holds all fuses registered to the system. -type fuseRegistry struct { - fuses map[string]*tsync.Fuse - fuseGuard *sync.Mutex -} - -// FuseRegistry is the global instance of fuseRegistry used to store the -// all registered fuses. -var FuseRegistry = fuseRegistry{ - fuses: make(map[string]*tsync.Fuse), - fuseGuard: new(sync.Mutex), -} - -// GetFuse returns a fuse object by name. This function will always return a -// valid fuse (creates fuses if they have not yet been created). -// This function is threadsafe. -func (registry *fuseRegistry) GetFuse(name string) *tsync.Fuse { - registry.fuseGuard.Lock() - defer registry.fuseGuard.Unlock() - - fuse, exists := registry.fuses[name] - if !exists { - fuse = tsync.NewFuse() - registry.fuses[name] = fuse - } - return fuse -} - -// ActivateAllFuses calls Activate on all registered fuses. -func (registry *fuseRegistry) ActivateAllFuses() { - registry.fuseGuard.Lock() - defer registry.fuseGuard.Unlock() - - for _, fuse := range registry.fuses { - fuse.Activate() - } -} diff --git a/core/plugin.go b/core/plugin.go index d4c8e15a..20e4352e 100644 --- a/core/plugin.go +++ b/core/plugin.go @@ -39,10 +39,6 @@ const ( PluginControlStopConsumer = PluginControl(iota) // PluginControlRoll notifies the consumer/producer about a reconnect or reopen request PluginControlRoll = PluginControl(iota) - // PluginControlFuseBurn notfies a producer to burn a fuse or a consumer that a fuse has been burned - PluginControlFuseBurn = PluginControl(iota) - // PluginControlFuseActive notfies a producer to activate a fuse or a consumer that a fuse has been activated - PluginControlFuseActive = PluginControl(iota) ) const ( diff --git a/core/producer_test.go b/core/producer_test.go index 8d48fe6c..534b9132 100644 --- a/core/producer_test.go +++ b/core/producer_test.go @@ -43,9 +43,6 @@ func getMockProducer() mockProducer { runState: new(PluginRunState), modulators: ModulatorArray{}, shutdownTimeout: 10 * time.Millisecond, - fuseName: "test", - fuseTimeout: 100 * time.Millisecond, - fuseControlGuard: new(sync.Mutex), Log: tlog.NewLogScope("test"), }, messages: NewMessageQueue(2), @@ -272,45 +269,3 @@ func TestProducerControlLoop(t *testing.T) { expect.Equal(atomic.LoadInt32(roll), int32(1)) } - -func TestProducerFuse(t *testing.T) { - expect := ttesting.NewExpect(t) - activateFuse := new(int32) - checkCounter := new(int32) - - mockP := getMockProducer() - mockP.SetCheckFuseCallback(func() bool { - atomic.AddInt32(checkCounter, 1) - return atomic.LoadInt32(activateFuse) == 1 - }) - - fuse := FuseRegistry.GetFuse(mockP.fuseName) - expect.False(fuse.IsBurned()) - - go mockP.ControlLoop() - - // Check basic functionality - - expect.NonBlocking(time.Second, func() { mockP.Control() <- PluginControlFuseBurn }) - time.Sleep(mockP.fuseTimeout) - expect.True(fuse.IsBurned()) - - time.Sleep(mockP.fuseTimeout * 2) - expect.True(fuse.IsBurned()) - expect.Greater(atomic.LoadInt32(checkCounter), int32(0)) - - atomic.StoreInt32(activateFuse, 1) - time.Sleep(mockP.fuseTimeout * 2) - expect.False(fuse.IsBurned()) - - // Check double calls - - atomic.StoreInt32(activateFuse, 0) - expect.NonBlocking(time.Second, func() { mockP.Control() <- PluginControlFuseBurn }) - expect.NonBlocking(time.Second, func() { mockP.Control() <- PluginControlFuseBurn }) - expect.True(fuse.IsBurned()) - - expect.NonBlocking(time.Second, func() { mockP.Control() <- PluginControlFuseActive }) - expect.NonBlocking(time.Second, func() { mockP.Control() <- PluginControlFuseActive }) - expect.False(fuse.IsBurned()) -} diff --git a/core/simpleconsumer.go b/core/simpleconsumer.go index 58cb3777..8b479cf9 100644 --- a/core/simpleconsumer.go +++ b/core/simpleconsumer.go @@ -17,7 +17,6 @@ package core import ( "github.com/trivago/tgo" "github.com/trivago/tgo/tlog" - "github.com/trivago/tgo/tsync" "sync" "sync/atomic" "time" @@ -32,7 +31,6 @@ import ( // - "consumer.Foobar": // Enable: true // ID: "" -// Fuse: "" // ShutdownTimeoutMs: 1000 // Router: // - "foo" @@ -48,13 +46,6 @@ import ( // which means only producers set to consume "all routers" will get these // messages. // -// Fuse defines the name of a fuse to observe for this consumer. Producer may -// "burn" the fuse when they encounter errors. Consumers may react on this by -// e.g. closing connections to notify any writing services of the problem. -// Set to "" by default which disables the fuse feature for this consumer. -// It is up to the consumer implementation to react on a broken fuse in an -// appropriate manner. -// // ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect // various timeouts during shutdown. By default this is set to 1 second. type SimpleConsumer struct { @@ -62,15 +53,12 @@ type SimpleConsumer struct { control chan PluginControl routers []Router runState *PluginRunState - fuse *tsync.Fuse shutdownTimeout time.Duration modulators ModulatorArray sequence *uint64 onRoll func() onPrepareStop func() onStop func() - onFuseBurned func() - onFuseActive func() Log tlog.LogScope } @@ -91,11 +79,6 @@ func (cons *SimpleConsumer) Configure(conf PluginConfigReader) error { cons.routers = append(cons.routers, stream) } - fuseName, err := conf.WithError.GetString("Fuse", "") - if !conf.Errors.Push(err) && fuseName != "" { - cons.fuse = FuseRegistry.GetFuse(fuseName) - } - cons.shutdownTimeout = time.Duration(conf.GetInt("ShutdownTimeoutMs", 1000)) * time.Millisecond return conf.Errors.OrNil() @@ -169,16 +152,6 @@ func (cons *SimpleConsumer) SetStopCallback(onStop func()) { cons.onStop = onStop } -// SetFuseBurnedCallback sets the function to be called upon PluginControlFuseBurned -func (cons *SimpleConsumer) SetFuseBurnedCallback(onFuseBurned func()) { - cons.onFuseBurned = onFuseBurned -} - -// SetFuseActiveCallback sets the function to be called upon PluginControlFuseActive -func (cons *SimpleConsumer) SetFuseActiveCallback(onFuseActive func()) { - cons.onFuseActive = onFuseActive -} - // SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's // internal plugin state. This method is also called by AddMainWorker. func (cons *SimpleConsumer) SetWorkerWaitGroup(workers *sync.WaitGroup) { @@ -202,23 +175,6 @@ func (cons *SimpleConsumer) WorkerDone() { cons.runState.WorkerDone() } -// WaitOnFuse blocks if the fuse linked to this consumer has been burned. -// If no fuse is bound this function does nothing. -func (cons *SimpleConsumer) WaitOnFuse() { - if cons.fuse != nil { - cons.fuse.Wait() - } -} - -// IsFuseBurned returns true if the fuse linked to this consumer has been -// burned. If no fuse is attached, false is returned. -func (cons *SimpleConsumer) IsFuseBurned() bool { - if cons.fuse == nil { - return false - } - return cons.fuse.IsBurned() -} - // Enqueue creates a new message from a given byte slice and passes it to // EnqueueMessage. Data is copied to the message. func (cons *SimpleConsumer) Enqueue(data []byte) { @@ -273,8 +229,6 @@ func (cons *SimpleConsumer) ControlLoop() { defer cons.setState(PluginStateDead) defer cons.Log.Debug.Print("Stopped") - go cons.fuseControlLoop() - for { command := <-cons.control switch command { @@ -307,18 +261,6 @@ func (cons *SimpleConsumer) ControlLoop() { if cons.onRoll != nil { cons.onRoll() } - - case PluginControlFuseBurn: - cons.Log.Debug.Print("Recieved fuse burned command") - if cons.onFuseBurned != nil { - cons.onFuseBurned() - } - - case PluginControlFuseActive: - cons.Log.Debug.Print("Recieved fuse active command") - if cons.onFuseActive != nil { - cons.onFuseActive() - } } } } @@ -353,20 +295,3 @@ func (cons *SimpleConsumer) tickerLoop(interval time.Duration, onTimeOut func()) } } } - -func (cons *SimpleConsumer) fuseControlLoop() { - if cons.fuse == nil { - return // ### return, no fuse attached ### - } - spin := tsync.NewSpinner(tsync.SpinPrioritySuspend) - for cons.IsActive() { - // If the fuse is burned: callback, wait, callback - if cons.IsFuseBurned() { - cons.Control() <- PluginControlFuseBurn - cons.WaitOnFuse() - cons.Control() <- PluginControlFuseActive - } else { - spin.Yield() - } - } -} diff --git a/core/simpleproducer.go b/core/simpleproducer.go index ee1cc344..2e1519bd 100644 --- a/core/simpleproducer.go +++ b/core/simpleproducer.go @@ -17,7 +17,6 @@ package core import ( "github.com/trivago/tgo" "github.com/trivago/tgo/tlog" - "github.com/trivago/tgo/tsync" "sync" "time" "fmt" @@ -39,8 +38,6 @@ import ( // - "filter.All" // Formatter: "format.Forward" // DropToStream: "_DROPPED_" -// Fuse: "" -// FuseTimeoutSec: 5 // Router: // - "foo" // - "bar" @@ -82,14 +79,6 @@ import ( // formatting it has to define a separate filter as the producer decides if // and where to format. // -// Fuse defines the name of a fuse to burn if e.g. the producer encounteres a -// lost connection. Each producer defines its own fuse breaking logic if -// necessary / applyable. Disable fuse behavior for a producer by setting an -// empty name or a FuseTimeoutSec <= 0. By default this is set to "". -// -// FuseTimeoutSec defines the interval in seconds used to check if the fuse can -// be recovered. Note that automatic fuse recovery logic depends on each -// producer's implementation. By default this setting is set to 10. type SimpleProducer struct { id string control chan PluginControl @@ -98,14 +87,9 @@ type SimpleProducer struct { dropStream Router runState *PluginRunState shutdownTimeout time.Duration - fuseTimeout time.Duration - fuseControlGuard *sync.Mutex - fuseControl *time.Timer - fuseName string onRoll func() onPrepareStop func() onStop func() - onCheckFuse func() bool Log tlog.LogScope } @@ -117,9 +101,6 @@ func (prod *SimpleProducer) Configure(conf PluginConfigReader) error { prod.control = make(chan PluginControl, 1) prod.streams = conf.GetStreamArray("Streams", []MessageStreamID{WildcardStreamID}) prod.shutdownTimeout = time.Duration(conf.GetInt("ShutdownTimeoutMs", 1000)) * time.Millisecond - prod.fuseTimeout = time.Duration(conf.GetInt("FuseTimeoutSec", 10)) * time.Second - prod.fuseName = conf.GetString("Fuse", "") - prod.fuseControlGuard = new(sync.Mutex) prod.modulators = conf.GetModulatorArray("Modulators", prod.Log, ModulatorArray{}) dropStreamID := StreamRegistry.GetStreamID(conf.GetString("DropToStream", DroppedStream)) @@ -163,15 +144,6 @@ func (prod *SimpleProducer) Control() chan<- PluginControl { return prod.control } -// GetFuse returns the fuse bound to this producer or nil if no fuse name has -// been set. -func (prod *SimpleProducer) GetFuse() *tsync.Fuse { - if prod.fuseName == "" || prod.fuseTimeout <= 0 { - return nil - } - return FuseRegistry.GetFuse(prod.fuseName) -} - // GetState returns the state this plugin is currently in func (prod *SimpleProducer) GetState() PluginState { return prod.runState.GetState() @@ -218,14 +190,6 @@ func (prod *SimpleProducer) SetStopCallback(onStop func()) { prod.onStop = onStop } -// SetCheckFuseCallback sets the function to be called upon PluginControlCheckFuse. -// The callback has to return true to trigger a fuse reactivation. -// If nil is passed as a callback PluginControlCheckFuse will reactivate the -// fuse immediately. -func (prod *SimpleProducer) SetCheckFuseCallback(onCheckFuse func() bool) { - prod.onCheckFuse = onCheckFuse -} - // SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's // internal plugin state. This method is also called by AddMainWorker. func (prod *SimpleProducer) SetWorkerWaitGroup(workers *sync.WaitGroup) { @@ -308,20 +272,6 @@ func (prod *SimpleProducer) ControlLoop() { if prod.onRoll != nil { prod.onRoll() } - - case PluginControlFuseBurn: - if fuse := prod.GetFuse(); fuse != nil && !fuse.IsBurned() { - fuse.Burn() - go prod.triggerCheckFuse() - prod.Log.Note.Print("Fuse burned") - } - - case PluginControlFuseActive: - if fuse := prod.GetFuse(); fuse != nil && fuse.IsBurned() { - prod.setFuseControl(nil) - fuse.Activate() - prod.Log.Note.Print("Fuse reactivated") - } } } } @@ -357,28 +307,3 @@ func (prod *SimpleProducer) tickerLoop(interval time.Duration, onTimeOut func()) } } } - -func (prod *SimpleProducer) setFuseControl(callback func()) { - prod.fuseControlGuard.Lock() - defer prod.fuseControlGuard.Unlock() - - if prod.fuseControl != nil { - prod.fuseControl.Stop() - } - - if callback == nil { - prod.fuseControl = nil - } else { - prod.fuseControl = time.AfterFunc(prod.fuseTimeout, callback) - } -} - -func (prod *SimpleProducer) triggerCheckFuse() { - if fuse := prod.GetFuse(); prod.onCheckFuse != nil && fuse.IsBurned() { - if prod.onCheckFuse() { - prod.Control() <- PluginControlFuseActive - } else { - prod.setFuseControl(prod.triggerCheckFuse) - } - } -} diff --git a/docs/consumers/console.rst b/docs/consumers/console.rst index e04e1a2f..1c410782 100644 --- a/docs/consumers/console.rst +++ b/docs/consumers/console.rst @@ -3,7 +3,6 @@ Console This consumer reads from stdin. A message is generated after each newline character. -When attached to a fuse, this consumer will stop accepting messages in case that fuse is burned. Parameters @@ -21,13 +20,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Console** Console defines the pipe to read from. This can be "stdin" or the name of a named pipe that is created if not existing. @@ -49,7 +41,6 @@ Example - "consumer.Console": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/file.rst b/docs/consumers/file.rst index 30a32b1e..66ce16fd 100644 --- a/docs/consumers/file.rst +++ b/docs/consumers/file.rst @@ -4,7 +4,6 @@ File The file consumer allows to read from files while looking for a delimiter that marks the end of a message. If the file is part of e.g. a log rotation the file consumer can be set to a symbolic link of the latest file and (optionally) be told to reopen the file by sending a SIGHUP. A symlink to a file will automatically be reopened if the underlying file is changed. -When attached to a fuse, this consumer will stop accepting messages in case that fuse is burned. Parameters @@ -22,13 +21,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **File** File is a mandatory setting and contains the file to read. The file will be read from beginning to end and the reader will stay attached until the consumer is stopped. @@ -57,7 +49,6 @@ Example - "consumer.File": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/http.rst b/docs/consumers/http.rst index 8f9205fc..6aa417b9 100644 --- a/docs/consumers/http.rst +++ b/docs/consumers/http.rst @@ -2,8 +2,6 @@ Http ==== This consumer opens up an HTTP 1.1 server and processes the contents of any incoming HTTP request. -When attached to a fuse, this consumer will return error 503 in case that fuse is burned. - Parameters ---------- @@ -20,13 +18,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Address** Address stores the host and port to bind to. This is allowed be any ip address/dns and port like "localhost:5880". @@ -64,7 +55,6 @@ Example - "consumer.Http": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/kafka.rst b/docs/consumers/kafka.rst index ea9cc4c0..aee0ad15 100644 --- a/docs/consumers/kafka.rst +++ b/docs/consumers/kafka.rst @@ -3,7 +3,6 @@ Kafka Thes consumer reads data from a given kafka topic. It is based on the sarama library so most settings are mapped to the settings from this library. -When attached to a fuse, this consumer will stop processing messages in case that fuse is burned. Parameters @@ -21,13 +20,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Topic** Topic defines the kafka topic to read from. By default this is set to "default". @@ -175,7 +167,6 @@ Example - "consumer.Kafka": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/kinesis.rst b/docs/consumers/kinesis.rst index 488db94a..fb2ec060 100644 --- a/docs/consumers/kinesis.rst +++ b/docs/consumers/kinesis.rst @@ -2,8 +2,6 @@ Kinesis ======= This consumer reads message from an AWS Kinesis stream. -When attached to a fuse, this consumer will stop processing messages in case that fuse is burned. - Parameters ---------- @@ -20,13 +18,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **KinesisStream** KinesisStream defines the stream to read from. By default this is set to "default". @@ -80,7 +71,6 @@ Example - "consumer.Kinesis": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/profiler.rst b/docs/consumers/profiler.rst index f0783aff..043f4ff9 100644 --- a/docs/consumers/profiler.rst +++ b/docs/consumers/profiler.rst @@ -3,7 +3,6 @@ Profiler The profiler plugin generates Runs x Batches messages and send them to the configured streams as fast as possible. This consumer can be used to profile producers and/or configurations. -When attached to a fuse, this consumer will stop processing messages in case that fuse is burned. Parameters @@ -21,13 +20,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Runs** Runs defines the number of messages per batch. By default this is set to 10000. @@ -69,7 +61,6 @@ Example - "consumer.Profile": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/proxy.rst b/docs/consumers/proxy.rst index 17ba7053..d79efb2b 100644 --- a/docs/consumers/proxy.rst +++ b/docs/consumers/proxy.rst @@ -4,7 +4,6 @@ Proxy The proxy consumer reads messages directly as-is from a given socket. Messages are extracted by standard message size algorithms (see Partitioner). This consumer can be used with any compatible proxy producer to establish a two-way communication. -When attached to a fuse, this consumer will stop accepting new connections and close all existing connections in case that fuse is burned. Parameters @@ -22,13 +21,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Address** Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". @@ -70,7 +62,6 @@ Example - "consumer.Proxy": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/socket.rst b/docs/consumers/socket.rst index 16aad020..c98a0389 100644 --- a/docs/consumers/socket.rst +++ b/docs/consumers/socket.rst @@ -3,7 +3,6 @@ Socket The socket consumer reads messages directly as-is from a given socket. Messages are separated from the stream by using a specific partitioner method. -When attached to a fuse, this consumer will stop accepting new connections (closing the socket) and close all existing connections in case that fuse is burned. Parameters @@ -21,13 +20,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Address** Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". @@ -95,7 +87,6 @@ Example - "consumer.Socket": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/syslogd.rst b/docs/consumers/syslogd.rst index 7d230068..7c1a920a 100644 --- a/docs/consumers/syslogd.rst +++ b/docs/consumers/syslogd.rst @@ -2,7 +2,6 @@ Syslogd ======= The syslogd consumer accepts messages from a syslogd compatible socket. -When attached to a fuse, this consumer will stop the syslogd service in case that fuse is burned. Parameters @@ -20,13 +19,6 @@ Parameters Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages. -**Fuse** - Fuse defines the name of a fuse to observe for this consumer. - Producer may "burn" the fuse when they encounter errors. - Consumers may react on this by e.g. closing connections to notify any writing services of the problem. - Set to "" by default which disables the fuse feature for this consumer. - It is up to the consumer implementation to react on a broken fuse in an appropriate manner. - **Address** Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". @@ -48,7 +40,6 @@ Example - "consumer.Syslogd": Enable: true ID: "" - Fuse: "" Stream: - "foo" - "bar" diff --git a/docs/consumers/systemdconsumer.rst b/docs/consumers/systemdconsumer.rst index ab470987..b5b5c4eb 100644 --- a/docs/consumers/systemdconsumer.rst +++ b/docs/consumers/systemdconsumer.rst @@ -4,7 +4,6 @@ SystemdConsumer NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The systemd consumer allows to read from the systemd journal. -When attached to a fuse, this consumer will stop reading messages in case that fuse is burned. Parameters @@ -53,17 +52,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **SystemdUnit** SystemdUnit defines what journal will be followed. This uses journal.add_match with _SYSTEMD_UNIT. @@ -94,8 +82,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/examples/config.rst b/docs/examples/config.rst index eca24763..9a1f92cd 100644 --- a/docs/examples/config.rst +++ b/docs/examples/config.rst @@ -173,31 +173,6 @@ Note that the standard proxy consumer and producer cannot react on details impli While this does work for simple protocols it will have problems with more complex protocols like http. In that case it is advisable to use or write a proxy plugin for this specific protocol. -Fuse ----- - -This configuration introduces a fuse to close the consumer connection if something goes wrong on the producer side. -Fuses work in a broadcasting manner, i.e. one producer "breaks" a fuse and multiple consumers may react on this. -The reasons for breaking a fuse may either be a dropped connection or a blocking consumer. -Different consumers may react differently on "broken" fuses, depending on their context. -In our case the incoming socket will be closed. - -:: - - - "consumer.Socket": - Stream: "forward" - Fuse: "socket" - Address: "127.0.0.1:5880" - Acknowledge: "OK" - - - "producer.Socket": - Stream: "forward" - Fuse: "socket" - Address: "unix://test/test.socket" - BatchTimeoutSec: 1 - Acknowledge: "OK" - - Profiling --------- diff --git a/docs/producers/benchmark.rst b/docs/producers/benchmark.rst index 587020fc..36ff1cf8 100644 --- a/docs/producers/benchmark.rst +++ b/docs/producers/benchmark.rst @@ -52,17 +52,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - Example ------- @@ -77,8 +66,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/console.rst b/docs/producers/console.rst index dd049501..c8eb1398 100644 --- a/docs/producers/console.rst +++ b/docs/producers/console.rst @@ -2,8 +2,6 @@ Console ======= The console producer writes messages to the standard output streams. -This producer does not implement a fuse breaker. - Parameters ---------- @@ -51,17 +49,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Console** Console may either be "stdout" or "stderr". By default it is set to "stdout". @@ -80,8 +67,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/elasticsearch.rst b/docs/producers/elasticsearch.rst index a70a9dbc..034120fc 100644 --- a/docs/producers/elasticsearch.rst +++ b/docs/producers/elasticsearch.rst @@ -2,8 +2,6 @@ ElasticSearch ============= The ElasticSearch producer sends messages to elastic search using the bulk http API. -This producer uses a fuse breaker when cluster health reports a "red" status or the connection is down. - Parameters ---------- @@ -51,17 +49,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **RetrySec** RetrySec denotes the time in seconds after which a failed dataset will be transmitted again. By default this is set to 5. @@ -136,8 +123,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/file.rst b/docs/producers/file.rst index f6673caf..8c7e0d8d 100644 --- a/docs/producers/file.rst +++ b/docs/producers/file.rst @@ -4,7 +4,6 @@ File The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary. -This producer does not implement a fuse breaker. Parameters @@ -53,17 +52,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **File** File contains the path to the log file to write. The wildcard character "*" can be used as a placeholder for the stream name. @@ -160,8 +148,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/firehose.rst b/docs/producers/firehose.rst index a16a9267..a6f35587 100644 --- a/docs/producers/firehose.rst +++ b/docs/producers/firehose.rst @@ -50,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Firehose** Firehose defines the stream to read from. By default this is set to "default". @@ -118,8 +107,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/httprequest.rst b/docs/producers/httprequest.rst index 661bb90e..99cbb82a 100644 --- a/docs/producers/httprequest.rst +++ b/docs/producers/httprequest.rst @@ -2,7 +2,6 @@ HTTPRequest =========== The HTTPRequest producers sends messages as HTTP packet to a given webserver. -This producer uses a fuse breaker when a request fails with an error code > 400 or the connection is down. Parameters @@ -51,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Address** Address defines the webserver to send http requests to. Set to "localhost:80" by default. @@ -88,8 +76,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/influxdb.rst b/docs/producers/influxdb.rst index 13d540d2..cdb3c43c 100644 --- a/docs/producers/influxdb.rst +++ b/docs/producers/influxdb.rst @@ -5,7 +5,6 @@ This producer writes data to an influxDB cluster. The data is expected to be of a valid influxDB format. As the data format changed between influxDB versions it is advisable to use a formatter for the specific influxDB version you want to write to. There are collectd to influxDB formatters available that can be used (as an example). -This producer uses a fuse breaker if the connection to the influxDB cluster is lost. Parameters @@ -54,17 +53,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Host** Host defines the host (and port) of the InfluxDB server. Defaults to "localhost:8086". @@ -130,8 +118,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/kafka.rst b/docs/producers/kafka.rst index d1152598..03a4d400 100644 --- a/docs/producers/kafka.rst +++ b/docs/producers/kafka.rst @@ -3,7 +3,6 @@ Kafka The kafka producer writes messages to a kafka cluster. This producer is backed by the sarama library so most settings relate to that library. -This producer uses a fuse breaker if any connection reports an error. Parameters @@ -52,17 +51,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **ClientId** ClientId sets the client id of this producer. By default this is "gollum". @@ -229,8 +217,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/kafkaproducer.rst b/docs/producers/kafkaproducer.rst index 5cbad1af..9dcbbd6c 100644 --- a/docs/producers/kafkaproducer.rst +++ b/docs/producers/kafkaproducer.rst @@ -5,7 +5,6 @@ NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The kafka producer writes messages to a kafka cluster. This producer is backed by the native librdkafka (0.8.6) library so most settings relate to that library. -This producer does not implement a fuse breaker. Parameters @@ -54,17 +53,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **SendRetries** SendRetries is mapped to message.send.max.retries. This defines the number of times librdkafka will try to re-send a message if it did not succeed. @@ -207,8 +195,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/kinesis.rst b/docs/producers/kinesis.rst index 689e14e9..047fa59c 100644 --- a/docs/producers/kinesis.rst +++ b/docs/producers/kinesis.rst @@ -50,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **KinesisStream** KinesisStream defines the stream to read from. By default this is set to "default". @@ -118,8 +107,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/null.rst b/docs/producers/null.rst index 57f85394..e7d71be2 100644 --- a/docs/producers/null.rst +++ b/docs/producers/null.rst @@ -3,7 +3,6 @@ Null This producer does nothing and provides only bare-bone configuration (i.e. enabled and streams). Use this producer to test consumer performance. -This producer does not implement a fuse breaker. Parameters @@ -52,16 +51,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. Example ------- @@ -77,8 +66,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/pcaphttp.rst b/docs/producers/pcaphttp.rst index 00364366..2707e9fc 100644 --- a/docs/producers/pcaphttp.rst +++ b/docs/producers/pcaphttp.rst @@ -53,17 +53,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Interface** Interface defines the network interface to listen on. By default this is set to eth0, get your specific value from ifconfig. @@ -99,8 +88,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/proxy.rst b/docs/producers/proxy.rst index 56982ff8..f39ee286 100644 --- a/docs/producers/proxy.rst +++ b/docs/producers/proxy.rst @@ -4,7 +4,6 @@ Proxy This producer is compatible to consumer.proxy. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms. -This producer does not implement a fuse breaker. Parameters @@ -53,17 +52,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Address** Address stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.Proxy". @@ -117,8 +105,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/redis.rst b/docs/producers/redis.rst index 460aa3d2..fede1f15 100644 --- a/docs/producers/redis.rst +++ b/docs/producers/redis.rst @@ -52,22 +52,10 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Address** Address stores the identifier to connect to. This can either be any ip address and port like "localhost:6379" or a file like "unix:///var/redis.socket". By default this is set to ":6379". - This producer does not implement a fuse breaker. **Database** Database defines the redis database to connect to. @@ -117,8 +105,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/s3.rst b/docs/producers/s3.rst index beb916bb..2b32bd71 100644 --- a/docs/producers/s3.rst +++ b/docs/producers/s3.rst @@ -50,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Region** Region defines the amazon region of your s3 bucket. By default this is set to "eu-west-1". @@ -155,8 +144,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/scribe.rst b/docs/producers/scribe.rst index 2b49b748..6971bc08 100644 --- a/docs/producers/scribe.rst +++ b/docs/producers/scribe.rst @@ -2,7 +2,6 @@ Scribe ====== The scribe producer allows sending messages to Facebook's scribe. -This producer uses a fuse breaker if the connection to the scribe server is lost. Parameters @@ -51,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Address** Address defines the host and port to connect to. By default this is set to "localhost:1463". @@ -109,8 +97,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/socket.rst b/docs/producers/socket.rst index 739df853..de0f769c 100644 --- a/docs/producers/socket.rst +++ b/docs/producers/socket.rst @@ -2,7 +2,6 @@ Socket ====== The socket producer connects to a service over a TCP, UDP or unix domain socket based connection. -This producer uses a fuse breaker when the service to connect to goes down. Parameters @@ -51,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Address** Address stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". @@ -109,8 +97,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/spooling.rst b/docs/producers/spooling.rst index e49a0f26..0efb2fe1 100644 --- a/docs/producers/spooling.rst +++ b/docs/producers/spooling.rst @@ -5,7 +5,6 @@ The Spooling producer buffers messages and sends them again to the previous stre This means the message must have been routed at least once before reaching the spooling producer. If the previous and current stream is identical the message is dropped. The Formatter configuration value is forced to "format.Serialize" and cannot be changed. -This producer does not implement a fuse breaker. Parameters @@ -54,17 +53,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Path** Path sets the output directory for spooling files. Spooling files will Files will be stored as "//.spl". @@ -123,8 +111,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/statsd.rst b/docs/producers/statsd.rst index 2e0d605b..5f645fec 100644 --- a/docs/producers/statsd.rst +++ b/docs/producers/statsd.rst @@ -50,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **BatchMaxMessages** BatchMaxMessages defines the maximum number of messages to send per batch. By default this is set to 500. @@ -101,8 +90,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/docs/producers/websocket.rst b/docs/producers/websocket.rst index 8faa90ee..97cb9b5c 100644 --- a/docs/producers/websocket.rst +++ b/docs/producers/websocket.rst @@ -2,7 +2,6 @@ Websocket ========= The websocket producer opens up a websocket. -This producer does not implement a fuse breaker. Parameters @@ -51,17 +50,6 @@ Parameters Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format. -**Fuse** - Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. - Each producer defines its own fuse breaking logic if necessary / applyable. - Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. - By default this is set to "". - -**FuseTimeoutSec** - FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. - Note that automatic fuse recovery logic depends on each producer's implementation. - By default this setting is set to 10. - **Address** Address defines the host and port to bind to. This is allowed be any ip address/dns and port like "localhost:5880". @@ -89,8 +77,6 @@ Example Formatter: "format.Forward" Filter: "filter.All" DropToStream: "_DROPPED_" - Fuse: "" - FuseTimeoutSec: 5 Stream: - "foo" - "bar" diff --git a/producer/InfluxDB.go b/producer/InfluxDB.go index 2decd009..4f19d216 100644 --- a/producer/InfluxDB.go +++ b/producer/InfluxDB.go @@ -23,13 +23,13 @@ import ( ) // InfluxDB producer plugin +// // This producer writes data to an influxDB cluster. The data is expected to be // of a valid influxDB format. As the data format changed between influxDB // versions it is advisable to use a formatter for the specific influxDB version // you want to write to. There are collectd to influxDB formatters available // that can be used (as an example). -// This producer uses a fuse breaker if the connection to the influxDB cluster -// is lost. +// // Configuration example // // - "producer.InfluxDB": diff --git a/producer/InfluxDBWriter08.go b/producer/InfluxDBWriter08.go index 1990209b..daf73dee 100644 --- a/producer/InfluxDBWriter08.go +++ b/producer/InfluxDBWriter08.go @@ -64,7 +64,6 @@ func (writer *influxDBWriter08) configure(conf core.PluginConfigReader, prod *In writer.testURL += "?" + credentials } - prod.SetCheckFuseCallback(writer.isConnectionUp) return conf.Errors.OrNil() } @@ -82,7 +81,6 @@ func (writer *influxDBWriter08) isConnectionUp() bool { } } - writer.Control() <- core.PluginControlFuseActive return writer.connectionUp } @@ -117,7 +115,7 @@ func (writer *influxDBWriter08) post() (int, error) { response, err := writer.client.Post(writeURL, "application/json", &writer.buffer) if err != nil { writer.connectionUp = false - writer.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) return 0, err // ### return, failed to connect ### } diff --git a/producer/InfluxDBWriter09.go b/producer/InfluxDBWriter09.go index 00814f66..113a3b7c 100644 --- a/producer/InfluxDBWriter09.go +++ b/producer/InfluxDBWriter09.go @@ -81,7 +81,6 @@ func (writer *influxDBWriter09) configure(conf core.PluginConfigReader, prod *In writer.messageHeader = "{\"database\":\"%s\",\"points\":[" } - prod.SetCheckFuseCallback(writer.isConnectionUp) return conf.Errors.OrNil() } @@ -100,7 +99,7 @@ func (writer *influxDBWriter09) isConnectionUp() bool { } } } - writer.Control() <- core.PluginControlFuseActive + return writer.connectionUp } @@ -128,7 +127,7 @@ func (writer *influxDBWriter09) post(databaseName string) (int, error) { response, err := writer.client.Post(writer.writeURL, "application/json", &writer.buffer) if err != nil { writer.connectionUp = false - writer.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) return 0, err // ### return, failed to connect ### } diff --git a/producer/InfluxDBWriter10.go b/producer/InfluxDBWriter10.go index e36a8a79..9ff1a86d 100644 --- a/producer/InfluxDBWriter10.go +++ b/producer/InfluxDBWriter10.go @@ -70,7 +70,6 @@ func (writer *influxDBWriter10) configure(conf core.PluginConfigReader, prod *In } writer.writeURL = fmt.Sprintf("%s%cprecision=ms", writer.writeURL, writer.separator) - prod.SetCheckFuseCallback(writer.isConnectionUp) return conf.Errors.OrNil() } @@ -90,7 +89,6 @@ func (writer *influxDBWriter10) isConnectionUp() bool { } } - writer.Control() <- core.PluginControlFuseActive return writer.connectionUp } @@ -124,7 +122,7 @@ func (writer *influxDBWriter10) post() (int, error) { response, err := writer.client.Post(writeURL, "text/plain; charset=utf-8", &writer.buffer) if err != nil { writer.connectionUp = false - writer.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) return 0, err // ### return, failed to connect ### } diff --git a/producer/console.go b/producer/console.go index 198b3641..bac6e2a0 100644 --- a/producer/console.go +++ b/producer/console.go @@ -23,8 +23,9 @@ import ( ) // Console producer plugin +// // The console producer writes messages to the standard output streams. -// This producer does not implement a fuse breaker. +// // Configuration example // // - "producer.Console": diff --git a/producer/elasticsearch.go b/producer/elasticsearch.go index 5685f703..8ec01de8 100644 --- a/producer/elasticsearch.go +++ b/producer/elasticsearch.go @@ -26,9 +26,10 @@ import ( ) // ElasticSearch producer plugin +// // The ElasticSearch producer sends messages to elastic search using the bulk -// http API. This producer uses a fuse breaker when cluster health reports a -// "red" status or the connection is down. +// http API. +// // Configuration example // // - "producer.ElasticSearch": @@ -157,7 +158,6 @@ func makeElasticMapping() elasticMapping { func (prod *ElasticSearch) Configure(conf core.PluginConfigReader) error { prod.BufferedProducer.Configure(conf) prod.SetStopCallback(prod.close) - prod.SetCheckFuseCallback(prod.isClusterUp) defaultServer := []string{"localhost"} numConnections := conf.GetInt("NumConnections", 6) @@ -223,7 +223,6 @@ func (prod *ElasticSearch) Configure(conf core.PluginConfigReader) error { } } - prod.SetCheckFuseCallback(prod.isClusterUp) return conf.Errors.OrNil() } @@ -291,12 +290,12 @@ func (prod *ElasticSearch) sendMessage(msg *core.Message) { if err != nil { prod.Log.Error.Print("Index error - ", err) if !prod.isClusterUp() { - prod.Control() <- core.PluginControlFuseBurn + // TBD: health check? (prev. fuse breaker) } prod.Drop(originalMsg) } else { tgo.Metric.Inc(elasticMetricMessages + index) - prod.Control() <- core.PluginControlFuseActive + // TBD: health check? (prev. fuse breaker) } } diff --git a/producer/file.go b/producer/file.go index 28fa6963..696a1947 100644 --- a/producer/file.go +++ b/producer/file.go @@ -29,10 +29,11 @@ import ( ) // File producer plugin +// // The file producer writes messages to a file. This producer also allows log // rotation and compression of the rotated logs. Folders in the file path will // be created if necessary. -// This producer does not implement a fuse breaker. +// // Configuration example // // - "producer.File": diff --git a/producer/httprequest.go b/producer/httprequest.go index 156a299f..d4282ccc 100644 --- a/producer/httprequest.go +++ b/producer/httprequest.go @@ -88,7 +88,6 @@ func (prod *HTTPRequest) Configure(conf core.PluginConfigReader) error { var err error prod.BufferedProducer.Configure(conf) prod.SetStopCallback(prod.close) - prod.SetCheckFuseCallback(prod.isHostUp) address := conf.GetString("Address", "http://localhost:80") if strings.Index(address, "://") == -1 { @@ -198,13 +197,13 @@ func (prod *HTTPRequest) sendReq(msg *core.Message) { // Fail prod.Log.Error.Print("Send failed: ", err) if !prod.isHostUp() { - prod.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) } prod.Drop(originalMsg) return } // Success - prod.Control() <- core.PluginControlFuseActive + // TBD: health check? (ex-fuse breaker) }() } diff --git a/producer/kafka.go b/producer/kafka.go index 82b2ea28..f1adfb1d 100644 --- a/producer/kafka.go +++ b/producer/kafka.go @@ -40,9 +40,10 @@ const ( ) // Kafka producer plugin +// // The kafka producer writes messages to a kafka cluster. This producer is // backed by the sarama library so most settings relate to that library. -// This producer uses a fuse breaker if any connection reports an error. +// // Configuration example // // - "producer.Kafka": @@ -232,7 +233,6 @@ func init() { func (prod *Kafka) Configure(conf core.PluginConfigReader) error { prod.BufferedProducer.Configure(conf) prod.SetStopCallback(prod.close) - prod.SetCheckFuseCallback(prod.checkAllTopics) kafka.Logger = prod.Log.Note prod.keyModulators = conf.GetModulatorArray("KeyModulators", prod.Log, core.ModulatorArray{}) @@ -500,7 +500,7 @@ func (prod *Kafka) produceMessage(msg *core.Message) { if err != nil { prod.Log.Error.Printf("%s is not connected: %s", topic.name, err.Error()) } - prod.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) return // ### return, not connected ### } diff --git a/producer/null.go b/producer/null.go index 92a8f70f..066cc044 100644 --- a/producer/null.go +++ b/producer/null.go @@ -23,7 +23,6 @@ import ( // Null producer plugin // This producer does nothing and provides only bare-bone configuration (i.e. // enabled and streams). Use this producer to test consumer performance. -// This producer does not implement a fuse breaker. type Null struct { core.SimpleProducer control chan core.PluginControl diff --git a/producer/proxy.go b/producer/proxy.go index 52fe9606..e2fbb8ad 100644 --- a/producer/proxy.go +++ b/producer/proxy.go @@ -30,7 +30,6 @@ import ( // Responses to messages sent to the given address are sent back to the original // consumer of it is a compatible message source. As with consumer.proxy the // returned messages are partitioned by common message length algorithms. -// This producer does not implement a fuse breaker. // Configuration example // // - "producer.Proxy": diff --git a/producer/redis.go b/producer/redis.go index 863df7f9..7f9bfa4a 100644 --- a/producer/redis.go +++ b/producer/redis.go @@ -43,7 +43,6 @@ import ( // Address stores the identifier to connect to. // This can either be any ip address and port like "localhost:6379" or a file // like "unix:///var/redis.socket". By default this is set to ":6379". -// This producer does not implement a fuse breaker. // // Database defines the redis database to connect to. // By default this is set to 0. diff --git a/producer/scribe.go b/producer/scribe.go index 1e9f5b81..66a5be8b 100644 --- a/producer/scribe.go +++ b/producer/scribe.go @@ -26,9 +26,9 @@ import ( ) // Scribe producer plugin +/// // The scribe producer allows sending messages to Facebook's scribe. -// This producer uses a fuse breaker if the connection to the scribe server is -// lost. +// // Configuration example // // - "producer.Scribe": @@ -136,7 +136,6 @@ func (prod *Scribe) Configure(conf core.PluginConfigReader) error { tgo.Metric.NewRate(metricName, scribeMetricMessagesSec+category, time.Second, 10, 3, true) } - prod.SetCheckFuseCallback(prod.tryOpenConnection) return conf.Errors.OrNil() } @@ -159,7 +158,6 @@ func (prod *Scribe) tryOpenConnection() bool { } prod.socket.Conn().(bufferedConn).SetWriteBuffer(prod.bufferSizeByte) - prod.Control() <- core.PluginControlFuseActive prod.lastHeartBeat = time.Now().Add(prod.heartBeatInterval) prod.Log.Note.Print("Connection opened") } @@ -187,7 +185,7 @@ func (prod *Scribe) tryOpenConnection() bool { } } - prod.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) prod.transport.Close() return false } @@ -257,7 +255,7 @@ func (prod *Scribe) transformMessages(messages []*core.Message) { if err != nil || resultCode != scribe.ResultCode_TRY_LATER { prod.Log.Error.Printf("Log error %d: %s", resultCode, err.Error()) - prod.Control() <- core.PluginControlFuseBurn + // TBD: health check? (ex-fuse breaker) prod.transport.Close() // reconnect prod.dropMessages(messages[idxStart:]) diff --git a/producer/socket.go b/producer/socket.go index 7752b1d9..1d3cf7de 100644 --- a/producer/socket.go +++ b/producer/socket.go @@ -26,9 +26,10 @@ import ( ) // Socket producer plugin +// // The socket producer connects to a service over a TCP, UDP or unix domain // socket based connection. -// This producer uses a fuse breaker when the service to connect to goes down. +// // Configuration example // // - "producer.Socket": @@ -95,7 +96,6 @@ func init() { func (prod *Socket) Configure(conf core.PluginConfigReader) error { prod.BufferedProducer.Configure(conf) prod.SetStopCallback(prod.close) - prod.SetCheckFuseCallback(prod.tryConnect) prod.batchMaxCount = conf.GetInt("Batch/MaxCount", 8192) prod.batchFlushCount = conf.GetInt("Batch/FlushCount", prod.batchMaxCount/2) @@ -142,7 +142,6 @@ func (prod *Socket) tryConnect() bool { conn.(bufferedConn).SetWriteBuffer(prod.bufferSizeByte) prod.assembly.SetWriter(conn) prod.connection = conn - prod.Control() <- core.PluginControlFuseActive return true } @@ -153,7 +152,7 @@ func (prod *Socket) closeConnection() error { prod.connection = nil if !prod.IsStopping() { - prod.Control() <- core.PluginControlFuseBurn + // TBD: action needed? (ex-fuse breaker) } } return nil diff --git a/producer/spooling.go b/producer/spooling.go index 83bf372a..62815f7c 100644 --- a/producer/spooling.go +++ b/producer/spooling.go @@ -26,13 +26,14 @@ import ( ) // Spooling producer plugin +// // The Spooling producer buffers messages and sends them again to the previous // stream stored in the message. This means the message must have been routed // at least once before reaching the spooling producer. If the previous and // current stream is identical the message is dropped. // The Formatter configuration value is forced to "format.Serialize" and // cannot be changed. -// This producer does not implement a fuse breaker. +// // Configuration example // // - "producer.Spooling": diff --git a/producer/websocket.go b/producer/websocket.go index 6e6f126a..c0e0ad4c 100644 --- a/producer/websocket.go +++ b/producer/websocket.go @@ -26,8 +26,9 @@ import ( ) // Websocket producer plugin +// // The websocket producer opens up a websocket. -// This producer does not implement a fuse breaker. +// // Configuration example // // - "producer.Websocket":