Skip to content

Commit

Permalink
New message handler
Browse files Browse the repository at this point in the history
- Report type now is only for presentation. No logic at all.
- New message handler to deal with retries and reporting
- More interfaces. Less implementation details.
- Reduced complexity (less gorutines, less channels, less is more)
  • Loading branch information
Bigomby committed Jul 27, 2016
1 parent d1919e8 commit 620aaf4
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 416 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ vet:

test:
@printf "$(MKL_YELLOW)Runing tests$(MKL_CLR_RESET)\n"
go test -v -cover ./...
go test -cover ./...
@printf "$(MKL_GREEN)Test passed$(MKL_CLR_RESET)\n"

coverage:
Expand Down
43 changes: 10 additions & 33 deletions backend.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
package rbforwarder

import (
"bytes"

"github.com/redBorder/rbforwarder/pipeline"
)
import "github.com/redBorder/rbforwarder/pipeline"

// Backend orchestrates the pipeline
type Backend struct {
sender pipeline.Sender

senderPool chan chan *pipeline.Message

currentProducedID uint64

input chan *pipeline.Message
messages chan *pipeline.Message
reports chan *pipeline.Message
messagePool chan *pipeline.Message

workers int
queueSize int
sender pipeline.Sender
senderPool chan chan *message
input chan *message
workers int
queueSize int
}

// NewBackend creates a new Backend
Expand All @@ -30,19 +18,8 @@ func NewBackend(workers, queueSize, maxMessages, maxBytes int) *Backend {
queueSize: queueSize,
}

b.senderPool = make(chan chan *pipeline.Message, b.workers)

b.messages = make(chan *pipeline.Message)
b.input = make(chan *pipeline.Message)
b.reports = make(chan *pipeline.Message)
b.messagePool = make(chan *pipeline.Message, b.queueSize)

for i := 0; i < b.queueSize; i++ {
b.messagePool <- &pipeline.Message{
InputBuffer: new(bytes.Buffer),
OutputBuffer: new(bytes.Buffer),
}
}
b.senderPool = make(chan chan *message, b.workers)
b.input = make(chan *message)

return b
}
Expand Down Expand Up @@ -70,9 +47,9 @@ func (b *Backend) Init() {
// Worker that sends the message
func (b *Backend) startSender(i int) {
sender := b.sender
sender.Init(i, b.reports)
sender.Init(i)

workerChannel := make(chan *pipeline.Message)
workerChannel := make(chan *message)

go func() {
for {
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ type Config struct {
MaxBytes int
ShowCounter int
}

// messageHandlerConfig is used to store the configuration for the reportHandler
type messageHandlerConfig struct {
maxRetries int
backoff int
queueSize int
}
59 changes: 59 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package rbforwarder

import "errors"

// message is used to send data through the pipeline
type message struct {
bufferStack [][]byte

seq uint64 // Unique ID for the report, used to maintain sequence
status string // Result of the sending
code int // Result of the sending
retries int
opts map[string]interface{}
channel chan *message
}

// PushData store data on an LIFO queue so the nexts handlers can use it
func (m *message) PushData(v []byte) {
m.bufferStack = append(m.bufferStack, v)
}

// PopData get the data stored by the previous handler
func (m *message) PopData() (ret []byte, err error) {
if len(m.bufferStack) < 1 {
err = errors.New("No data on the stack")
return
}

ret = m.bufferStack[len(m.bufferStack)-1]
m.bufferStack = m.bufferStack[0 : len(m.bufferStack)-1]

return
}

// GetOpt returns an option
func (m message) GetOpt(name string) (opt interface{}, err error) {
if opt = m.opts[name]; opt == nil {
err = errors.New("No option available: " + name)
}

return
}

func (m message) GetReport() Report {
return Report{
code: m.code,
status: m.status,
retries: m.retries,
opts: m.opts,
}
}

// Done send the report to the report handler so it can be delivered to the
// user
func (m *message) Done(code int, status string) {
m.code = code
m.status = status
m.channel <- m
}
134 changes: 134 additions & 0 deletions messageHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package rbforwarder

import "time"

// reportHandler is used to handle the reports produced by the last element
// of the pipeline. The first element of the pipeline can know the status
// of the produced message using GetReports() or GetOrderedReports()
type messageHandler struct {
input chan *message // Receive messages
freedMessages chan *message // Messages after its report has been delivered
retry chan *message // Send messages to retry
unordered chan *message // Send reports out of order
out chan *message // Send reports in order
close chan struct{} // Stop sending reports
queued map[uint64]Report // Store pending reports
currentReport uint64 // Last delivered report

config messageHandlerConfig
}

// newReportHandler creates a new instance of reportHandler
func newMessageHandler(maxRetries, backoff, queueSize int,
retry chan *message) *messageHandler {

return &messageHandler{
input: make(chan *message, queueSize),
freedMessages: make(chan *message, queueSize),
unordered: make(chan *message, queueSize),
close: make(chan struct{}),
queued: make(map[uint64]Report),
retry: retry,
config: messageHandlerConfig{
maxRetries: maxRetries,
backoff: backoff,
},
}
}

// Init initializes the processing of reports
func (r *messageHandler) Init() {
go func() {
// Get reports from the input channel
forOuterLoop:
for {
select {
case <-r.close:
break forOuterLoop
case message := <-r.input:
// Report when:
// - Message has been received successfully
// - Retrying has been disabled
// - The max number of retries has been reached
if message.code == 0 ||
r.config.maxRetries == 0 ||
(r.config.maxRetries > 0 &&
message.retries >= r.config.maxRetries) {

// Send the report to the client
r.unordered <- message
} else {
// Retry in other case
go func() {
message.retries++
Logger.
WithField("Seq", message.seq).
WithField("Retry", message.retries).
WithField("Status", message.status).
WithField("Code", message.code).
Warnf("Retrying message")

<-time.After(time.Duration(r.config.backoff) * time.Second)
r.retry <- message
}()
}
}
}
close(r.unordered)
}()

Logger.Debug("Report Handler ready")
}

func (r *messageHandler) GetReports() chan Report {
done := make(chan struct{})
reports := make(chan Report)

go func() {
done <- struct{}{}

for message := range r.unordered {
reports <- message.GetReport()
}
close(reports)
}()

<-done
return reports
}

func (r *messageHandler) GetOrderedReports() chan Report {
done := make(chan struct{})
reports := make(chan Report)

go func() {
done <- struct{}{}
for message := range r.unordered {
report := message.GetReport()

if message.seq == r.currentReport {
// The message is the expected. Send it.
reports <- report
r.currentReport++
} else {
// This message is not the expected. Store it.
r.queued[message.seq] = report
}

// Check if there are stored messages and send them.
for {
if currentReport, ok := r.queued[r.currentReport]; ok {
reports <- currentReport
delete(r.queued, r.currentReport)
r.currentReport++
} else {
break
}
}
}
close(reports)
}()

<-done
return reports
}
2 changes: 1 addition & 1 deletion pipeline/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package pipeline
// Decoder is the component that parses a raw buffer to a structure
type Decoder interface {
Init(int) error
Decode(*Message) error
Decode(Messenger) error
}
2 changes: 1 addition & 1 deletion pipeline/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package pipeline
// Encoder serializes a data structure to a output buffer
type Encoder interface {
Init(int) error
Encode(*Message) error
Encode(Messenger) error
}
21 changes: 0 additions & 21 deletions pipeline/message.go

This file was deleted.

9 changes: 9 additions & 0 deletions pipeline/messenger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pipeline

// Messenger is used by modules to handle messages
type Messenger interface {
PopData() ([]byte, error)
PushData(data []byte)
Done(code int, status string)
GetOpt(name string) (interface{}, error)
}
2 changes: 1 addition & 1 deletion pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package pipeline
// Processor performs operations on a data structure
type Processor interface {
Init(int) error
Process(message *Message) (bool, error)
Process(Messenger) (bool, error)
}
4 changes: 2 additions & 2 deletions pipeline/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package pipeline

// Sender takes a raw buffer and sent it using a network protocol
type Sender interface {
Init(int, chan *Message) error
OnMessage(*Message) error
Init(id int) error
OnMessage(Messenger)
}
Loading

0 comments on commit 620aaf4

Please sign in to comment.