Skip to content

Commit

Permalink
Merge pull request #109 from trivago/ghissue-97-remove-fuse-mechanism
Browse files Browse the repository at this point in the history
Remove fuse mechanism (#97)
  • Loading branch information
arnecls committed May 5, 2017
2 parents b2e47af + 7100bb5 commit f1c1d9c
Show file tree
Hide file tree
Showing 70 changed files with 57 additions and 819 deletions.
2 changes: 0 additions & 2 deletions config/file_send.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
"StdIn":
Type: "consumer.Console"
Streams: "console"
Fuse: "file"

"FileOut":
Type: "producer.File"
Streams: "console"
Fuse: "file"
File: /tmp/gollum_test.log
Batch:
TimeoutSec: 1
2 changes: 0 additions & 2 deletions config/http_send.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
"StdIn":
Type: "consumer.Console"
Streams: "console"
Fuse: "http"

"HttpOut":
Type: "producer.HTTPRequest"
Streams: "console"
Fuse: "http"
Address: "http://localhost:9090"
RawData: false
2 changes: 0 additions & 2 deletions config/socket_forward.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"SocketIn":
Type: "consumer.Socket"
Streams: "forward"
Fuse: "socket"
Address: "127.0.0.1:5880"
Acknowledge: "OK"
Partitioner: "ascii"
Expand All @@ -10,7 +9,6 @@
"SocketOut":
Type: "producer.Socket"
Streams: "forward"
Fuse: "socket"
Address: "unix://test/test.socket"
Formatters:
- "format.Runlength"
Expand Down
2 changes: 0 additions & 2 deletions config/socket_send.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"ReadStdIn":
Type: "consumer.Console"
Streams: "console"
Fuse: "socket"

"AddRunLength":
Type: "router.Broadcast"
Expand All @@ -18,7 +17,6 @@
"ToSocket":
Type: "producer.Socket"
Streams: "console"
Fuse: "socket"
Address: "unix://test/test.socket"
ConnectionBufferSizeKB: 128
BatchTimeoutSec: 1
Expand Down
6 changes: 3 additions & 3 deletions consumer/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ const (
)

// Console consumer plugin
//
// This consumer reads from stdin. A message is generated after each newline
// character. When attached to a fuse, this consumer will stop accepting
// messages in case that fuse is burned.
// character.
//
// Configuration example
//
// - "consumer.Console":
Expand Down Expand Up @@ -102,7 +103,6 @@ func (cons *Console) readPipe() {
buffer := tio.NewBufferedReader(consoleBufferGrowSize, 0, 0, "\n")
for cons.IsActive() {
err := buffer.ReadAll(cons.pipe, cons.Enqueue)
cons.WaitOnFuse()
switch err {
case io.EOF:
if cons.autoexit {
Expand Down
5 changes: 2 additions & 3 deletions consumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ const (
)

// File consumer plugin
//
// The file consumer allows to read from files while looking for a delimiter
// that marks the end of a message. If the file is part of e.g. a log rotation
// the file consumer can be set to a symbolic link of the latest file and
// (optionally) be told to reopen the file by sending a SIGHUP. A symlink to
// a file will automatically be reopened if the underlying file is changed.
// When attached to a fuse, this consumer will stop accepting messages in case
// that fuse is burned.
//
// Configuration example
//
// - "consumer.File":
Expand Down Expand Up @@ -227,7 +227,6 @@ func (cons *File) read() {
// Try to read from the file
if cons.state == fileStateRead && cons.file != nil {
err := buffer.ReadAll(cons.file, sendFunction)
cons.WaitOnFuse()

switch {
case err == nil: // ok
Expand Down
8 changes: 2 additions & 6 deletions consumer/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
)

// Http consumer plugin
//
// This consumer opens up an HTTP 1.1 server and processes the contents of any
// incoming HTTP request.
// When attached to a fuse, this consumer will return error 503 in case that
// fuse is burned.
//
// Configuration example
//
// - "consumer.Http":
Expand Down Expand Up @@ -136,10 +136,6 @@ func (cons *Http) requestHandler(resp http.ResponseWriter, req *http.Request) {
return
}
}
if cons.IsFuseBurned() {
resp.WriteHeader(http.StatusServiceUnavailable)
return // ### return, service is down ###
}

if cons.withHeaders {
// Read the whole package
Expand Down
7 changes: 2 additions & 5 deletions consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ const (
)

// Kafka consumer plugin
//
// Thes consumer reads data from a given kafka topic. It is based on the sarama
// library so most settings are mapped to the settings from this library.
// When attached to a fuse, this consumer will stop processing messages in case
// that fuse is burned.
//
// Configuration example
//
// - "consumer.Kafka":
Expand Down Expand Up @@ -404,7 +404,6 @@ func (cons *Kafka) readFromGroup() {
spin := tsync.NewSpinner(tsync.SpinPriorityLow)

for !cons.groupClient.Closed() {
cons.WaitOnFuse()
select {
case event := <-consumer.Messages():
if cons.prependKey {
Expand Down Expand Up @@ -458,7 +457,6 @@ func (cons *Kafka) readFromPartition(partitionID int32) {
spin := tsync.NewSpinner(tsync.SpinPriorityLow)

for !cons.client.Closed() {
cons.WaitOnFuse()

select {
case event := <-partCons.Messages():
Expand Down Expand Up @@ -512,7 +510,6 @@ func (cons *Kafka) readPartitions(partitions []int32) {
spin := tsync.NewSpinner(tsync.SpinPriorityLow)
for !cons.client.Closed() {
for idx, consumer := range consumers {
cons.WaitOnFuse()
partition := partitions[idx]

select {
Expand Down
5 changes: 2 additions & 3 deletions consumer/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ const (
)

// Kinesis consumer plugin
//
// This consumer reads message from an AWS Kinesis router.
// When attached to a fuse, this consumer will stop processing messages in case
// that fuse is burned.
//
// Configuration example
//
// - "consumer.Kinesis":
Expand Down Expand Up @@ -256,7 +256,6 @@ func (cons *Kinesis) processShard(shardID string) {
recordConfig := (*kinesis.GetRecordsInput)(nil)

for cons.running {
cons.WaitOnFuse()
if recordConfig == nil {
recordConfig = cons.createShardIteratorConfig(shardID)
}
Expand Down
5 changes: 2 additions & 3 deletions consumer/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
)

// Profiler consumer plugin
//
// The profiler plugin generates Runs x Batches messages and send them to the
// configured streams as fast as possible. This consumer can be used to profile
// producers and/or configurations.
// When attached to a fuse, this consumer will stop processing messages in case
// that fuse is burned.
//
// Configuration example
//
// - "consumer.Profile":
Expand Down Expand Up @@ -183,7 +183,6 @@ func (cons *Profiler) profile() {
start := time.Now()

for i := 0; i < cons.profileRuns && cons.IsActive(); i++ {
cons.WaitOnFuse()
template := cons.templates[rand.Intn(len(cons.templates))]
messageCount++

Expand Down
5 changes: 2 additions & 3 deletions consumer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ const (
)

// Proxy consumer plugin.
//
// The proxy consumer reads messages directly as-is from a given socket.
// Messages are extracted by standard message size algorithms (see Partitioner).
// This consumer can be used with any compatible proxy producer to establish
// a two-way communication.
// When attached to a fuse, this consumer will stop accepting new connections
// and close all existing connections in case that fuse is burned.
//
// Configuration example
//
// - "consumer.Proxy":
Expand Down Expand Up @@ -145,7 +145,6 @@ func (cons *Proxy) accept() {

listener := cons.listen.(net.Listener)
for cons.IsActive() {
cons.WaitOnFuse()

client, err := listener.Accept()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consumer/proxyclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (client *proxyClient) sendMessage(data []byte) {
func (client *proxyClient) read() {
buffer := tio.NewBufferedReader(proxyClientBufferGrowSize, client.proxy.flags, client.proxy.offset, client.proxy.delimiter)

for client.proxy.IsActive() && client.connected && !client.proxy.IsFuseBurned() {
for client.proxy.IsActive() && client.connected {
err := buffer.ReadAll(client.conn, client.sendMessage)

// Handle read errors
Expand Down
15 changes: 3 additions & 12 deletions consumer/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ const (
)

// Socket consumer plugin
//
// The socket consumer reads messages directly as-is from a given socket.
// Messages are separated from the stream by using a specific partitioner method.
// When attached to a fuse, this consumer will stop accepting new connections
// (closing the socket) and close all existing connections in case that fuse is
// burned.
//
// Configuration example
//
// - "consumer.Socket":
Expand Down Expand Up @@ -211,7 +210,7 @@ func (cons *Socket) processConnection(conn net.Conn) {

buffer := tio.NewBufferedReader(socketBufferGrowSize, cons.flags, cons.offset, cons.delimiter)

for cons.IsActive() && !cons.IsFuseBurned() {
for cons.IsActive() {
conn.SetReadDeadline(time.Now().Add(cons.readTimeout))
err := buffer.ReadAll(conn, cons.Enqueue)
if err == nil {
Expand Down Expand Up @@ -257,9 +256,6 @@ func (cons *Socket) udpAccept() {
addr, _ := net.ResolveUDPAddr(cons.protocol, cons.address)

for cons.IsActive() {
// Prevent reconnection until fuse is active again
cons.WaitOnFuse()

// (re)open a tcp connection
for cons.listen == nil {
if listener, err := net.ListenUDP(cons.protocol, addr); err == nil {
Expand All @@ -281,9 +277,6 @@ func (cons *Socket) tcpAccept() {
defer cons.closeTCPConnection()

for cons.IsActive() {
// Prevent reconnection until fuse is active again
cons.WaitOnFuse()

// (re)open a tcp connection
for cons.listen == nil {
listener, err := net.Listen(cons.protocol, cons.address)
Expand Down Expand Up @@ -368,11 +361,9 @@ func (cons *Socket) Consume(workers *sync.WaitGroup) {

if cons.protocol == "udp" {
go tgo.WithRecoverShutdown(cons.udpAccept)
cons.SetFuseBurnedCallback(cons.closeConnection)
defer cons.closeConnection()
} else {
go tgo.WithRecoverShutdown(cons.tcpAccept)
cons.SetFuseBurnedCallback(cons.closeTCPConnection)
defer cons.closeTCPConnection()
}

Expand Down
7 changes: 2 additions & 5 deletions consumer/syslogd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
)

// Syslogd consumer plugin
//
// The syslogd consumer accepts messages from a syslogd compatible socket.
// When attached to a fuse, this consumer will stop the syslogd service in case
// that fuse is burned.
//
// Configuration example
//
// - "consumer.Syslogd":
Expand Down Expand Up @@ -143,9 +143,6 @@ func (cons *Syslogd) Consume(workers *sync.WaitGroup) {
server.Boot()
defer server.Kill()

cons.SetFuseBurnedCallback(func() { server.Kill() })
cons.SetFuseActiveCallback(func() { server.Boot() })
cons.ControlLoop()

server.Wait()
}
4 changes: 3 additions & 1 deletion contrib/native/kafka/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
)

// KafkaProducer librdkafka producer plugin
//
// NOTICE: This producer is not included in standard builds. To enable it
// you need to trigger a custom build with native plugins enabled.
// The kafka producer writes messages to a kafka cluster. This producer is
// backed by the native librdkafka (0.8.6) library so most settings relate
// to that library. This producer does not implement a fuse breaker.
// to that library.
//
// Configuration example
//
// - "native.KafkaProducer":
Expand Down
5 changes: 2 additions & 3 deletions contrib/native/systemd/systemdconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
)

// SystemdConsumer consumer plugin
//
// NOTICE: This producer is not included in standard builds. To enable it
// you need to trigger a custom build with native plugins enabled.
// The systemd consumer allows to read from the systemd journal.
// When attached to a fuse, this consumer will stop reading messages in case
// that fuse is burned.
//
// Configuration example
//
// - "native.Systemd":
Expand Down Expand Up @@ -152,7 +152,6 @@ func (cons *SystemdConsumer) close() {

func (cons *SystemdConsumer) read() {
for cons.IsActive() {
cons.WaitOnFuse()

c, err := cons.journal.Next()
if err != nil {
Expand Down
10 changes: 0 additions & 10 deletions core/bufferedproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
// Formatter: "format.Forward"
// Filter: "filter.All"
// DropToStream: "_DROPPED_"
// Fuse: ""
// FuseTimeoutSec: 5
// Router:
// - "foo"
// - "bar"
Expand Down Expand Up @@ -76,14 +74,6 @@ import (
// formatting it has to define a separate filter as the producer decides if
// and where to format.
//
// Fuse defines the name of a fuse to burn if e.g. the producer encounteres a
// lost connection. Each producer defines its own fuse breaking logic if
// necessary / applyable. Disable fuse behavior for a producer by setting an
// empty name or a FuseTimeoutSec <= 0. By default this is set to "".
//
// FuseTimeoutSec defines the interval in seconds used to check if the fuse can
// be recovered. Note that automatic fuse recovery logic depends on each
// producer's implementation. By default this setting is set to 10.
type BufferedProducer struct {
SimpleProducer
messages MessageQueue
Expand Down

0 comments on commit f1c1d9c

Please sign in to comment.