Skip to content

Commit

Permalink
Merge branch 'release/0.4-beta3'
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Oct 18, 2016
2 parents 133f064 + 54aba33 commit 1aa457e
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 64 deletions.
7 changes: 4 additions & 3 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func (b *backend) Init() {
select {
case messageChannel <- m:
b.currentMessages++
b.currentBytes += uint64(m.OutputBuffer.Len())
b.currentBytes += uint64(m.InputBuffer.Len())
case <-time.After(1 * time.Second):
logger.Warn("Error on produce: Full queue")
Logger.Warn("Error on produce: Full queue")
}
case <-time.After(1 * time.Second):
m.Report(-1, "Error on produce: No workers available")
Expand All @@ -137,6 +137,7 @@ func (b *backend) Init() {
<-done

b.active = true
Logger.Debug("Backend ready")
}

// Worker that decodes the received message
Expand Down Expand Up @@ -232,7 +233,7 @@ func (b *backend) startEncoder(i int) {
// Worker that sends the message
func (b *backend) startSender(i int) {
if b.senderHelper == nil {
logger.Fatal("No sender provided")
Logger.Fatal("No sender provided")
}

sender := b.senderHelper.CreateSender()
Expand Down
26 changes: 0 additions & 26 deletions logger.go

This file was deleted.

22 changes: 9 additions & 13 deletions rbforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
)

// Version is the current tag
var Version = "0.4-beta2"
var Version = "0.4-beta3"

var log = logrus.New()

// Logger for the package
var logger *logrus.Entry
var Logger = logrus.NewEntry(log)

//------------------------------------------------------------------------------
// RBForwarder
Expand All @@ -27,7 +29,6 @@ type Config struct {
MaxMessages int
MaxBytes int
ShowCounter int
Debug bool
}

// RBForwarder is the main objecto of the package. It has the main methods for
Expand All @@ -44,12 +45,6 @@ type RBForwarder struct {

// NewRBForwarder creates a new Forwarder object
func NewRBForwarder(config Config) *RBForwarder {
if config.Debug {
LogLevel(logrus.DebugLevel)
}

logger = NewLogger("backend")

backend := &backend{
workers: config.Workers,
queue: config.QueueSize,
Expand All @@ -67,10 +62,13 @@ func NewRBForwarder(config Config) *RBForwarder {
fields := logrus.Fields{
"workers": config.Workers,
"retries": config.Retries,
"backoff_time": config.Backoff,
"queue_size": config.QueueSize,
"max_messages": config.MaxMessages,
"max_bytes": config.MaxBytes,
}
logger.WithFields(fields).Info("Initialized rB Forwarder")

Logger.WithFields(fields).Debug("Initialized rB Forwarder")

return forwarder
}
Expand All @@ -80,11 +78,9 @@ func (f *RBForwarder) Start() {

// Start the backend
f.backend.Init()
logger.Info("Backend ready")

// Start the report handler
f.reportHandler.Init()
logger.Info("Reporter ready")

if f.config.ShowCounter > 0 {
go func() {
Expand All @@ -94,7 +90,7 @@ func (f *RBForwarder) Start() {
)
<-timer.C
if f.counter > 0 {
logger.Infof(
Logger.Infof(
"Messages per second %d",
f.counter/uint64(f.config.ShowCounter),
)
Expand Down
19 changes: 9 additions & 10 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rbforwarder

import (
"fmt"
"log"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -56,7 +55,7 @@ func TestBackend(t *testing.T) {
select {
case reportChannel <- report:
default:
log.Fatal("Can't send report to report channel")
Printf("Can't send report to report channel")
}
}
}()
Expand All @@ -68,11 +67,11 @@ func TestBackend(t *testing.T) {

message, err := rbforwarder.TakeMessage()
if err != nil {
log.Fatal(err)
Printf(err.Error())
}
message.InputBuffer.WriteString("Hola mundo")
if err := message.Produce(); err != nil {
log.Fatal(err)
Printf(err.Error())
}

Convey("A \"Hello World\" message should be sent", func() {
Expand Down Expand Up @@ -106,18 +105,18 @@ func TestBackend2(t *testing.T) {
select {
case reportChannel <- report:
default:
log.Fatal("Can't send report to report channel")
Printf("Can't send report to report channel")
}
}
}()

message, err := rbforwarder.TakeMessage()
if err != nil {
log.Fatal(err)
Printf(err.Error())
}
message.InputBuffer.WriteString("Hola mundo")
if err := message.Produce(); err != nil {
log.Fatal(err)
Printf(err.Error())
}

Convey("A report of the sent message should be received", func() {
Expand Down Expand Up @@ -155,19 +154,19 @@ func TestBackend3(t *testing.T) {
select {
case reportChannel <- report:
default:
log.Fatal("Can't send report to report channel")
Printf("Can't send report to report channel")
}
}
}()

for i := 0; i < 100; i++ {
message, err := rbforwarder.TakeMessage()
if err != nil {
log.Fatal(err)
Printf(err.Error())
}
message.InputBuffer.WriteString("Message")
if err := message.Produce(); err != nil {
log.Fatal(err)
Printf(err.Error())
}

go func() {
Expand Down
6 changes: 3 additions & 3 deletions reporthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type reportHandlerConfig struct {
// of the produced message using GetReports() or GetOrderedReports()
type reportHandler struct {
in chan *Message // Used to receive messages
retries chan *Message // Used to send messages if status code is not 0
freedMessages chan *Message // Used to send messages messages after its report has been delivered
unordered chan Report // Used to send reports out of order
out chan Report // Used to send reports in order
Expand All @@ -38,7 +37,6 @@ type reportHandler struct {
func newReportHandler(maxRetries, backoff, queue int) *reportHandler {
return &reportHandler{
in: make(chan *Message, queue),
retries: make(chan *Message, queue),
freedMessages: make(chan *Message, queue),
unordered: make(chan Report, queue),
out: make(chan Report, queue),
Expand Down Expand Up @@ -91,7 +89,7 @@ func (r *reportHandler) Init() {
} else {
go func() {
message.report.Retries++
logger.
Logger.
WithField("ID", message.report.ID).
WithField("Retry", message.report.Retries).
WithField("Status", message.report.Status).
Expand All @@ -106,6 +104,8 @@ func (r *reportHandler) Init() {
}
close(r.unordered)
}()

Logger.Debug("Report Handler ready")
}

func (r *reportHandler) GetReports() chan Report {
Expand Down
13 changes: 9 additions & 4 deletions senders/httpsender/helper.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package httpsender

import (
"log"
"time"

"github.com/Sirupsen/logrus"
"github.com/redBorder/rbforwarder"
)

var log = logrus.New()

// Logger for the package
var Logger = logrus.NewEntry(log)

// Helper is used to create instances of HTTP senders
type Helper struct {
config config
}

// NewHelper creates a new sender helper
func NewHelper(rawConfig map[string]interface{}) Helper {
logger = rbforwarder.NewLogger("sender")
parsedConfig, _ := parseConfig(rawConfig)

return Helper{
Expand All @@ -26,6 +30,7 @@ func NewHelper(rawConfig map[string]interface{}) Helper {
func (s Helper) CreateSender() rbforwarder.Sender {
httpSender := new(Sender)
httpSender.config = s.config
httpSender.logger = Logger.WithField("prefix", "sender")

return httpSender
}
Expand All @@ -35,7 +40,7 @@ func parseConfig(raw map[string]interface{}) (parsed config, err error) {
if raw["url"] != nil {
parsed.URL = raw["url"].(string)
} else {
log.Fatal("No url provided")
Logger.Fatal("No url provided")
}

if raw["endpoint"] != nil {
Expand All @@ -45,7 +50,7 @@ func parseConfig(raw map[string]interface{}) (parsed config, err error) {
if raw["insecure"] != nil {
parsed.IgnoreCert = raw["insecure"].(bool)
if parsed.IgnoreCert {
logger.Warn("Ignoring SSL certificates")
Logger.Warn("Ignoring SSL certificates")
}
}

Expand Down
9 changes: 4 additions & 5 deletions senders/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ const (
errHTTP = 103
)

var logger *logrus.Entry

// Sender receives data from pipe and send it via HTTP to an endpoint
type Sender struct {
id int
Expand All @@ -33,6 +31,7 @@ type Sender struct {
timer *time.Timer

// Configuration
logger *logrus.Entry
config config
}

Expand Down Expand Up @@ -75,7 +74,7 @@ func (s *Sender) Init(id int) error {
for {
timer := time.NewTimer(time.Duration(s.config.ShowCounter) * time.Second)
<-timer.C
logger.WithField("worker", s.id).Infof("Messages per second %d", s.counter/int64(s.config.ShowCounter))
s.logger.WithField("worker", s.id).Infof("Messages per second %d", s.counter/int64(s.config.ShowCounter))
s.counter = 0
}
}()
Expand Down Expand Up @@ -137,7 +136,7 @@ func (s *Sender) Send(message *rbforwarder.Message) error {
// Write the new message to the buffer and increase the number of messages in
// the buffer
if _, err := batchBuffer.writer.Write(message.OutputBuffer.Bytes()); err != nil {
logger.Error(err)
s.logger.Error(err)
}
batchBuffer.messages = append(batchBuffer.messages, message)
batchBuffer.messageCount++
Expand Down Expand Up @@ -188,7 +187,7 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
// Create the HTTP POST request
req, err := http.NewRequest("POST", s.config.URL+"/"+path, batchBuffer.buff)
if err != nil {
logger.Errorf("Error creating request: %s", err.Error())
s.logger.Errorf("Error creating request: %s", err.Error())
for _, message := range batchBuffer.messages {
message.Report(errRequest, err.Error())
}
Expand Down

0 comments on commit 1aa457e

Please sign in to comment.