Skip to content

Commit

Permalink
Merge a9aa6f8 into a6a903f
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Jul 26, 2016
2 parents a6a903f + a9aa6f8 commit c53d102
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 525 deletions.
177 changes: 22 additions & 155 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,22 @@ package rbforwarder
import (
"bytes"
"time"
)

// Decoder is the component that parses a raw buffer to a structure
type Decoder interface {
Init(int) error
Decode(*Message) error
}

// Processor performs operations on a data structure
type Processor interface {
Init(int) error
Process(message *Message) (bool, error)
}

// Encoder serializes a data structure to a output buffer
type Encoder interface {
Init(int) error
Encode(*Message) error
}

// Sender takes a raw buffer and sent it using a network protocol
type Sender interface {
Init(int) error
Send(*Message) error
}

// SenderHelper is used to create Senders instances
type SenderHelper interface {
CreateSender() Sender
}
"github.com/redBorder/rbforwarder/pipeline"
)

type backend struct {
decoder Decoder
processor Processor
encoder Encoder
senderHelper SenderHelper
sender pipeline.Sender

// Pool of workers
decoderPool chan chan *Message
processorPool chan chan *Message
encoderPool chan chan *Message
senderPool chan chan *Message
senderPool chan chan *pipeline.Message

currentProducedID uint64

input chan *Message
messages chan *Message
reports chan *Message
messagePool chan *Message
input chan *pipeline.Message
messages chan *pipeline.Message
reports chan *pipeline.Message
messagePool chan *pipeline.Message

workers int
queue int
Expand All @@ -66,32 +33,24 @@ type backend struct {
}

func (b *backend) Init() {
b.decoderPool = make(chan chan *Message, b.workers)
b.processorPool = make(chan chan *Message, b.workers)
b.encoderPool = make(chan chan *Message, b.workers)
b.senderPool = make(chan chan *Message, b.workers)
b.senderPool = make(chan chan *pipeline.Message, b.workers)

b.messages = make(chan *Message)
b.input = make(chan *Message)
b.reports = make(chan *Message)
b.messagePool = make(chan *Message, b.queue)
b.messages = make(chan *pipeline.Message)
b.input = make(chan *pipeline.Message)
b.reports = make(chan *pipeline.Message)
b.messagePool = make(chan *pipeline.Message, b.queue)

b.keepSending = make(chan struct{})

for i := 0; i < b.queue; i++ {
b.messagePool <- &Message{
b.messagePool <- &pipeline.Message{
Metadata: make(map[string]interface{}),
InputBuffer: new(bytes.Buffer),
OutputBuffer: new(bytes.Buffer),

backend: b,
}
}

for i := 0; i < b.workers; i++ {
b.startDecoder(i)
b.startProcessor(i)
b.startEncoder(i)
b.startSender(i)
}

Expand Down Expand Up @@ -121,7 +80,7 @@ func (b *backend) Init() {

// Send to workers
select {
case messageChannel := <-b.decoderPool:
case messageChannel := <-b.senderPool:
select {
case messageChannel <- m:
b.currentMessages++
Expand All @@ -130,7 +89,9 @@ func (b *backend) Init() {
Logger.Warn("Error on produce: Full queue")
}
case <-time.After(1 * time.Second):
m.Report(-1, "Error on produce: No workers available")
m.Report.StatusCode = -1
m.Report.Status = "Error on produce: No workers available"
b.reports <- m
}
}
}()
Expand All @@ -140,112 +101,18 @@ func (b *backend) Init() {
Logger.Debug("Backend ready")
}

// Worker that decodes the received message
func (b *backend) startDecoder(i int) {
if b.decoder != nil {
b.decoder.Init(i)
}
workerChannel := make(chan *Message)

go func() {
for {
// The worker is ready, put himself on the worker pool
b.decoderPool <- workerChannel

// Wait for a new message
message := <-workerChannel

// Perform work on the message
if b.decoder != nil {
b.decoder.Decode(message)
}

// Get a worker for the next element on the pipe
messageChannel := <-b.processorPool

// Send the message to the next worker
messageChannel <- message
}
}()
}

// Worker that performs modifications on a decoded message
func (b *backend) startProcessor(i int) {
if b.processor != nil {
b.processor.Init(i)
}
workerChannel := make(chan *Message)

// The worker is ready, put himself on the worker pool
go func() {
for {
// The worker is ready, put himself on the worker pool
b.processorPool <- workerChannel

// Wait for a new message
message := <-workerChannel

// Perform work on the message
if b.processor != nil {
b.processor.Process(message)
}

// Get a worker for the next element on the pipe
messageChannel := <-b.encoderPool

// Send the message to the next worker
messageChannel <- message
}
}()
}

// Worker that encodes a modified message
func (b *backend) startEncoder(i int) {
if b.encoder != nil {
b.encoder.Init(i)
}
workerChannel := make(chan *Message)

go func() {
for {
// The worker is ready, put himself on the worker pool
b.encoderPool <- workerChannel

// Wait for a new message
message := <-workerChannel

// Perform work on the message
if b.encoder != nil {
b.encoder.Encode(message)
} else {
message.OutputBuffer = message.InputBuffer
}

// Get a worker for the next element on the pipe
messageChannel := <-b.senderPool

// Send the message to the next worker
messageChannel <- message
}
}()
}

// Worker that sends the message
func (b *backend) startSender(i int) {
if b.senderHelper == nil {
Logger.Fatal("No sender provided")
}

sender := b.senderHelper.CreateSender()
sender.Init(i)
sender := b.sender
sender.Init(i, b.reports)

workerChannel := make(chan *Message)
workerChannel := make(chan *pipeline.Message)

go func() {
for {
b.senderPool <- workerChannel
message := <-workerChannel
sender.Send(message)
sender.OnMessage(message)
}
}()
}
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package rbforwarder

// Config stores the configuration for a forwarder
type Config struct {
Retries int
Backoff int
Workers int
QueueSize int
MaxMessages int
MaxBytes int
ShowCounter int
}
47 changes: 0 additions & 47 deletions messages.go

This file was deleted.

7 changes: 7 additions & 0 deletions pipeline/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pipeline

// Decoder is the component that parses a raw buffer to a structure
type Decoder interface {
Init(int) error
Decode(*Message) error
}
7 changes: 7 additions & 0 deletions pipeline/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pipeline

// Encoder serializes a data structure to a output buffer
type Encoder interface {
Init(int) error
Encode(*Message) error
}
22 changes: 22 additions & 0 deletions pipeline/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pipeline

import "bytes"

// Message is used to send data through the pipeline
type Message struct {
InputBuffer *bytes.Buffer // The original data from the source
Data interface{} // Can be used to store the data once it has been parsed
OutputBuffer *bytes.Buffer // The data that will be sent by the sender
Metadata map[string]interface{} // Opaque

Report Report
}

// Report is used by the source to obtain the status of a sent message
type Report struct {
ID uint64 // Unique ID for the report, used to maintain sequence
Status string // Result of the sending
StatusCode int // Result of the sending
Retries int
Metadata map[string]interface{}
}
7 changes: 7 additions & 0 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pipeline

// Processor performs operations on a data structure
type Processor interface {
Init(int) error
Process(message *Message) (bool, error)
}
7 changes: 7 additions & 0 deletions pipeline/sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pipeline

// Sender takes a raw buffer and sent it using a network protocol
type Sender interface {
Init(int, chan *Message) error
OnMessage(*Message) error
}
Loading

0 comments on commit c53d102

Please sign in to comment.