Skip to content

Commit

Permalink
New backend and fixes
Browse files Browse the repository at this point in the history
- New backend allows to create a custom pipeline
- Safe way to close channels
  • Loading branch information
Bigomby committed Jul 28, 2016
1 parent ae7c54f commit 154bacc
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 247 deletions.
114 changes: 74 additions & 40 deletions backend.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,95 @@
package rbforwarder

import "github.com/redBorder/rbforwarder/pipeline"
import (
"time"

"github.com/redBorder/rbforwarder/pipeline"
)

// Backend orchestrates the pipeline
type Backend struct {
sender pipeline.Sender
senderPool chan chan *message
input chan *message
workers int
queueSize int
componentPools []chan chan *message
input chan *message
output chan *message

working int
}

// NewBackend creates a new Backend
func NewBackend(workers, queueSize, maxMessages, maxBytes int) *Backend {
func NewBackend(input, output chan *message) *Backend {
b := &Backend{
workers: workers,
queueSize: queueSize,
}

b.senderPool = make(chan chan *message, b.workers)
b.input = make(chan *message)

return b
}

// Init initializes a backend
func (b *Backend) Init() {
for i := 0; i < b.workers; i++ {
b.startSender(i)
input: input,
output: output,
working: 1,
}

// Get messages from produces and send them to workers
done := make(chan struct{})
go func() {
done <- struct{}{}
// Start receiving messages
for m := range b.input {
messageChannel := <-b.senderPool
messageChannel <- m
worker := <-b.componentPools[0]
worker <- m
}

// When a close signal is received clean the workers. Wait for workers to
// terminate
b.working = 0
for _, componentPool := range b.componentPools {
loop:
for {
select {
case worker := <-componentPool:
close(worker)
case <-time.After(10 * time.Millisecond):
break loop
}
}
}

// Messages coming too late will be ignored. If the channel is not set to
// nil, late messages will panic
b.output = nil

// Send a close signal to message handler
close(output)
}()
<-done

Logger.Debug("Backend ready")
return b
}

// Worker that sends the message
func (b *Backend) startSender(i int) {
sender := b.sender
sender.Init(i)
// PushComponent adds a new component to the pipeline
func (b *Backend) PushComponent(c pipeline.Composer, w int) {
index := len(b.componentPools)
componentPool := make(chan chan *message, w)
b.componentPools = append(b.componentPools, componentPool)

workerChannel := make(chan *message)
for i := 0; i < w; i++ {
c.Init(i)

go func() {
for {
b.senderPool <- workerChannel
message := <-workerChannel
sender.OnMessage(message)
}
}()
worker := make(chan *message)
b.componentPools[index] <- worker

go func() {
for m := range worker {
c.OnMessage(
m,
func(m pipeline.Messenger) {
if len(b.componentPools) >= index {
nextWorker := <-b.componentPools[index+1]
nextWorker <- m.(*message)
}
},
func(m pipeline.Messenger, code int, status string) {
rbmessage := m.(*message)
rbmessage.code = code
rbmessage.status = status
b.output <- rbmessage
},
)

if b.working == 1 {
b.componentPools[index] <- worker
}
}
}()
}
}
17 changes: 3 additions & 14 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,7 @@ package rbforwarder

// Config stores the configuration for a forwarder
type Config struct {
Retries int
Backoff int
Workers int
QueueSize int
MaxMessages int
MaxBytes int
ShowCounter int
}

// messageHandlerConfig is used to store the configuration for the reportHandler
type messageHandlerConfig struct {
maxRetries int
backoff int
queueSize int
Retries int
Backoff int
QueueSize int
}
8 changes: 0 additions & 8 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,3 @@ func (m message) GetReport() Report {
opts: m.opts,
}
}

// Done send the report to the report handler so it can be delivered to the
// user
func (m *message) Done(code int, status string) {
m.code = code
m.status = status
m.channel <- m
}
140 changes: 64 additions & 76 deletions messageHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,129 +6,117 @@ import "time"
// of the pipeline. The first element of the pipeline can know the status
// of the produced message using GetReports() or GetOrderedReports()
type messageHandler struct {
input chan *message // Receive messages
freedMessages chan *message // Messages after its report has been delivered
retry chan *message // Send messages to retry
unordered chan *message // Send reports out of order
out chan *message // Send reports in order
close chan struct{} // Stop sending reports
handler chan *message // Receive messages from pipeline
pipeline chan *message // Send messages back to the pipeline
out chan *message // Send reports to the user

queued map[uint64]Report // Store pending reports
currentReport uint64 // Last delivered report

config messageHandlerConfig
maxRetries int
backoff int
}

// newReportHandler creates a new instance of reportHandler
func newMessageHandler(maxRetries, backoff, queueSize int,
retry chan *message) *messageHandler {

return &messageHandler{
input: make(chan *message, queueSize),
freedMessages: make(chan *message, queueSize),
unordered: make(chan *message, queueSize),
close: make(chan struct{}),
queued: make(map[uint64]Report),
retry: retry,
config: messageHandlerConfig{
maxRetries: maxRetries,
backoff: backoff,
},
func newMessageHandler(
maxRetries, backoff int,
handler, pipeline chan *message,
) *messageHandler {

mh := &messageHandler{
handler: handler,
pipeline: pipeline,
out: make(chan *message, 100),

queued: make(map[uint64]Report),

maxRetries: maxRetries,
backoff: backoff,
}
}

// Init initializes the processing of reports
func (r *messageHandler) Init() {
go func() {
// Get reports from the input channel
forOuterLoop:
for {
select {
case <-r.close:
break forOuterLoop
case message := <-r.input:
// Report when:
// - Message has been received successfully
// - Retrying has been disabled
// - The max number of retries has been reached
if message.code == 0 ||
r.config.maxRetries == 0 ||
(r.config.maxRetries > 0 &&
message.retries >= r.config.maxRetries) {

// Send the report to the client
r.unordered <- message
} else {
// Retry in other case
go func() {
message.retries++
Logger.
WithField("Seq", message.seq).
WithField("Retry", message.retries).
WithField("Status", message.status).
WithField("Code", message.code).
Warnf("Retrying message")

<-time.After(time.Duration(r.config.backoff) * time.Second)
r.retry <- message
}()
}
// Get reports from the handler channel
for m := range mh.handler {
// If the message has status code 0 (success) send the report to the user
if m.code == 0 || mh.maxRetries == 0 {
mh.out <- m
continue
}

// If the message has status code != 0 (fail) but has been retried the
// maximum number or retries also send it to the user
if mh.maxRetries > 0 && m.retries >= mh.maxRetries {
mh.out <- m
continue
}

// In othe case retry the message sending it again to the pipeline
go func(m *message) {
m.retries++
Logger.
WithField("Seq", m.seq).
WithField("Retry", m.retries).
WithField("Status", m.status).
WithField("Code", m.code).
Warnf("Retrying message")

<-time.After(time.Duration(mh.backoff) * time.Second)
mh.pipeline <- m
}(m)
}
close(r.unordered)

close(mh.out)
}()

Logger.Debug("Report Handler ready")
Logger.Debug("Message Handler ready")

return mh
}

func (r *messageHandler) GetReports() chan Report {
done := make(chan struct{})
func (mh *messageHandler) GetReports() chan Report {
reports := make(chan Report)

go func() {
done <- struct{}{}

for message := range r.unordered {
for message := range mh.out {
reports <- message.GetReport()
}

close(reports)
}()

<-done
return reports
}

func (r *messageHandler) GetOrderedReports() chan Report {
done := make(chan struct{})
func (mh *messageHandler) GetOrderedReports() chan Report {
reports := make(chan Report)

go func() {
done <- struct{}{}
for message := range r.unordered {
for message := range mh.out {
report := message.GetReport()

if message.seq == r.currentReport {
if message.seq == mh.currentReport {
// The message is the expected. Send it.
reports <- report
r.currentReport++
mh.currentReport++
} else {
// This message is not the expected. Store it.
r.queued[message.seq] = report
mh.queued[message.seq] = report
}

// Check if there are stored messages and send them.
for {
if currentReport, ok := r.queued[r.currentReport]; ok {
if currentReport, ok := mh.queued[mh.currentReport]; ok {
reports <- currentReport
delete(r.queued, r.currentReport)
r.currentReport++
delete(mh.queued, mh.currentReport)
mh.currentReport++
} else {
break
}
}
}

close(reports)
}()

<-done
return reports
}
17 changes: 17 additions & 0 deletions pipeline/composer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pipeline

// Next should be called by a component in order to pass the message to the next
// component in the pipeline.
type Next func(Messenger)

// Done should be called by a component in order to return the message to the
// message handler. Can be used by the last component to inform that the
// message processing is done o by a middle component to inform an error.
type Done func(Messenger, int, string)

// Composer represents a component in the pipeline that performs a work on
// a message
type Composer interface {
Init(int) error
OnMessage(Messenger, Next, Done)
}
1 change: 0 additions & 1 deletion pipeline/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ package pipeline
type Messenger interface {
PopData() ([]byte, error)
PushData(data []byte)
Done(code int, status string)
GetOpt(name string) (interface{}, error)
}
7 changes: 0 additions & 7 deletions pipeline/sender.go

This file was deleted.

Loading

0 comments on commit 154bacc

Please sign in to comment.