From 5f1037b09bf73715c9abb4e2e63b56a5c1d185a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Fern=C3=A1ndez=20Barrera?= Date: Wed, 20 Apr 2016 15:05:07 +0200 Subject: [PATCH 1/4] Fix bug where the max bytes limiter doesn't work --- backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend.go b/backend.go index 1684f6e..9306583 100644 --- a/backend.go +++ b/backend.go @@ -125,7 +125,7 @@ func (b *backend) Init() { select { case messageChannel <- m: b.currentMessages++ - b.currentBytes += uint64(m.OutputBuffer.Len()) + b.currentBytes += uint64(m.InputBuffer.Len()) case <-time.After(1 * time.Second): logger.Warn("Error on produce: Full queue") } From ce10e2983fe50914871411511febcf4ee3790595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Fern=C3=A1ndez=20Barrera?= Date: Wed, 20 Apr 2016 15:07:27 +0200 Subject: [PATCH 2/4] Modified logging system --- backend.go | 5 +++-- logger.go | 26 -------------------------- rbforwarder.go | 20 ++++++++------------ rbforwarder_test.go | 19 +++++++++---------- reporthandler.go | 4 +++- senders/httpsender/helper.go | 13 +++++++++---- senders/httpsender/sender.go | 9 ++++----- 7 files changed, 36 insertions(+), 60 deletions(-) delete mode 100644 logger.go diff --git a/backend.go b/backend.go index 9306583..57d3903 100644 --- a/backend.go +++ b/backend.go @@ -127,7 +127,7 @@ func (b *backend) Init() { b.currentMessages++ b.currentBytes += uint64(m.InputBuffer.Len()) case <-time.After(1 * time.Second): - logger.Warn("Error on produce: Full queue") + Logger.Warn("Error on produce: Full queue") } case <-time.After(1 * time.Second): m.Report(-1, "Error on produce: No workers available") @@ -137,6 +137,7 @@ func (b *backend) Init() { <-done b.active = true + Logger.Debug("Backend ready") } // Worker that decodes the received message @@ -232,7 +233,7 @@ func (b *backend) startEncoder(i int) { // Worker that sends the message func (b *backend) startSender(i int) { if b.senderHelper == nil { - logger.Fatal("No sender provided") + Logger.Fatal("No sender provided") } sender := b.senderHelper.CreateSender() diff --git a/logger.go b/logger.go deleted file mode 100644 index 5d68e79..0000000 --- a/logger.go +++ /dev/null @@ -1,26 +0,0 @@ -package rbforwarder - -import ( - "github.com/Sirupsen/logrus" - prefixed "github.com/x-cray/logrus-prefixed-formatter" -) - -var level = logrus.InfoLevel - -// LogLevel sets logging level -func LogLevel(newLevel logrus.Level) { - level = newLevel -} - -// NewLogger creates a new logger object -func NewLogger(prefix string) *logrus.Entry { - log := logrus.New() - log.Formatter = new(prefixed.TextFormatter) - log.Level = level - - logger := log.WithFields(logrus.Fields{ - "prefix": prefix, - }) - - return logger -} diff --git a/rbforwarder.go b/rbforwarder.go index 9e73459..014ea13 100644 --- a/rbforwarder.go +++ b/rbforwarder.go @@ -11,8 +11,10 @@ import ( // Version is the current tag var Version = "0.4-beta2" +var log = logrus.New() + // Logger for the package -var logger *logrus.Entry +var Logger = logrus.NewEntry(log) //------------------------------------------------------------------------------ // RBForwarder @@ -27,7 +29,6 @@ type Config struct { MaxMessages int MaxBytes int ShowCounter int - Debug bool } // RBForwarder is the main objecto of the package. It has the main methods for @@ -44,12 +45,6 @@ type RBForwarder struct { // NewRBForwarder creates a new Forwarder object func NewRBForwarder(config Config) *RBForwarder { - if config.Debug { - LogLevel(logrus.DebugLevel) - } - - logger = NewLogger("backend") - backend := &backend{ workers: config.Workers, queue: config.QueueSize, @@ -67,10 +62,13 @@ func NewRBForwarder(config Config) *RBForwarder { fields := logrus.Fields{ "workers": config.Workers, "retries": config.Retries, + "backoff_time": config.Backoff, "queue_size": config.QueueSize, "max_messages": config.MaxMessages, + "max_bytes": config.MaxBytes, } - logger.WithFields(fields).Info("Initialized rB Forwarder") + + Logger.WithFields(fields).Debug("Initialized rB Forwarder") return forwarder } @@ -80,11 +78,9 @@ func (f *RBForwarder) Start() { // Start the backend f.backend.Init() - logger.Info("Backend ready") // Start the report handler f.reportHandler.Init() - logger.Info("Reporter ready") if f.config.ShowCounter > 0 { go func() { @@ -94,7 +90,7 @@ func (f *RBForwarder) Start() { ) <-timer.C if f.counter > 0 { - logger.Infof( + Logger.Infof( "Messages per second %d", f.counter/uint64(f.config.ShowCounter), ) diff --git a/rbforwarder_test.go b/rbforwarder_test.go index 46f2bcd..623c060 100644 --- a/rbforwarder_test.go +++ b/rbforwarder_test.go @@ -2,7 +2,6 @@ package rbforwarder import ( "fmt" - "log" "math/rand" "testing" "time" @@ -56,7 +55,7 @@ func TestBackend(t *testing.T) { select { case reportChannel <- report: default: - log.Fatal("Can't send report to report channel") + Printf("Can't send report to report channel") } } }() @@ -68,11 +67,11 @@ func TestBackend(t *testing.T) { message, err := rbforwarder.TakeMessage() if err != nil { - log.Fatal(err) + Printf(err.Error()) } message.InputBuffer.WriteString("Hola mundo") if err := message.Produce(); err != nil { - log.Fatal(err) + Printf(err.Error()) } Convey("A \"Hello World\" message should be sent", func() { @@ -106,18 +105,18 @@ func TestBackend2(t *testing.T) { select { case reportChannel <- report: default: - log.Fatal("Can't send report to report channel") + Printf("Can't send report to report channel") } } }() message, err := rbforwarder.TakeMessage() if err != nil { - log.Fatal(err) + Printf(err.Error()) } message.InputBuffer.WriteString("Hola mundo") if err := message.Produce(); err != nil { - log.Fatal(err) + Printf(err.Error()) } Convey("A report of the sent message should be received", func() { @@ -155,7 +154,7 @@ func TestBackend3(t *testing.T) { select { case reportChannel <- report: default: - log.Fatal("Can't send report to report channel") + Printf("Can't send report to report channel") } } }() @@ -163,11 +162,11 @@ func TestBackend3(t *testing.T) { for i := 0; i < 100; i++ { message, err := rbforwarder.TakeMessage() if err != nil { - log.Fatal(err) + Printf(err.Error()) } message.InputBuffer.WriteString("Message") if err := message.Produce(); err != nil { - log.Fatal(err) + Printf(err.Error()) } go func() { diff --git a/reporthandler.go b/reporthandler.go index 45b5c6e..9bbcc2d 100644 --- a/reporthandler.go +++ b/reporthandler.go @@ -91,7 +91,7 @@ func (r *reportHandler) Init() { } else { go func() { message.report.Retries++ - logger. + Logger. WithField("ID", message.report.ID). WithField("Retry", message.report.Retries). WithField("Status", message.report.Status). @@ -106,6 +106,8 @@ func (r *reportHandler) Init() { } close(r.unordered) }() + + Logger.Debug("Report Handler ready") } func (r *reportHandler) GetReports() chan Report { diff --git a/senders/httpsender/helper.go b/senders/httpsender/helper.go index bebebc9..9d00195 100644 --- a/senders/httpsender/helper.go +++ b/senders/httpsender/helper.go @@ -1,12 +1,17 @@ package httpsender import ( - "log" "time" + "github.com/Sirupsen/logrus" "github.com/redBorder/rbforwarder" ) +var log = logrus.New() + +// Logger for the package +var Logger = logrus.NewEntry(log) + // Helper is used to create instances of HTTP senders type Helper struct { config config @@ -14,7 +19,6 @@ type Helper struct { // NewHelper creates a new sender helper func NewHelper(rawConfig map[string]interface{}) Helper { - logger = rbforwarder.NewLogger("sender") parsedConfig, _ := parseConfig(rawConfig) return Helper{ @@ -26,6 +30,7 @@ func NewHelper(rawConfig map[string]interface{}) Helper { func (s Helper) CreateSender() rbforwarder.Sender { httpSender := new(Sender) httpSender.config = s.config + httpSender.logger = Logger.WithField("prefix", "sender") return httpSender } @@ -35,7 +40,7 @@ func parseConfig(raw map[string]interface{}) (parsed config, err error) { if raw["url"] != nil { parsed.URL = raw["url"].(string) } else { - log.Fatal("No url provided") + Logger.Fatal("No url provided") } if raw["endpoint"] != nil { @@ -45,7 +50,7 @@ func parseConfig(raw map[string]interface{}) (parsed config, err error) { if raw["insecure"] != nil { parsed.IgnoreCert = raw["insecure"].(bool) if parsed.IgnoreCert { - logger.Warn("Ignoring SSL certificates") + Logger.Warn("Ignoring SSL certificates") } } diff --git a/senders/httpsender/sender.go b/senders/httpsender/sender.go index c7fd9ed..3159626 100644 --- a/senders/httpsender/sender.go +++ b/senders/httpsender/sender.go @@ -20,8 +20,6 @@ const ( errHTTP = 103 ) -var logger *logrus.Entry - // Sender receives data from pipe and send it via HTTP to an endpoint type Sender struct { id int @@ -33,6 +31,7 @@ type Sender struct { timer *time.Timer // Configuration + logger *logrus.Entry config config } @@ -75,7 +74,7 @@ func (s *Sender) Init(id int) error { for { timer := time.NewTimer(time.Duration(s.config.ShowCounter) * time.Second) <-timer.C - logger.WithField("worker", s.id).Infof("Messages per second %d", s.counter/int64(s.config.ShowCounter)) + s.logger.WithField("worker", s.id).Infof("Messages per second %d", s.counter/int64(s.config.ShowCounter)) s.counter = 0 } }() @@ -137,7 +136,7 @@ func (s *Sender) Send(message *rbforwarder.Message) error { // Write the new message to the buffer and increase the number of messages in // the buffer if _, err := batchBuffer.writer.Write(message.OutputBuffer.Bytes()); err != nil { - logger.Error(err) + s.logger.Error(err) } batchBuffer.messages = append(batchBuffer.messages, message) batchBuffer.messageCount++ @@ -188,7 +187,7 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) { // Create the HTTP POST request req, err := http.NewRequest("POST", s.config.URL+"/"+path, batchBuffer.buff) if err != nil { - logger.Errorf("Error creating request: %s", err.Error()) + s.logger.Errorf("Error creating request: %s", err.Error()) for _, message := range batchBuffer.messages { message.Report(errRequest, err.Error()) } From 07d0ca95c810f3bfc51244a23229bbf9c2df084f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Fern=C3=A1ndez=20Barrera?= Date: Wed, 20 Apr 2016 15:07:59 +0200 Subject: [PATCH 3/4] Remove unused channel --- reporthandler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/reporthandler.go b/reporthandler.go index 9bbcc2d..f63a4c2 100644 --- a/reporthandler.go +++ b/reporthandler.go @@ -23,7 +23,6 @@ type reportHandlerConfig struct { // of the produced message using GetReports() or GetOrderedReports() type reportHandler struct { in chan *Message // Used to receive messages - retries chan *Message // Used to send messages if status code is not 0 freedMessages chan *Message // Used to send messages messages after its report has been delivered unordered chan Report // Used to send reports out of order out chan Report // Used to send reports in order @@ -38,7 +37,6 @@ type reportHandler struct { func newReportHandler(maxRetries, backoff, queue int) *reportHandler { return &reportHandler{ in: make(chan *Message, queue), - retries: make(chan *Message, queue), freedMessages: make(chan *Message, queue), unordered: make(chan Report, queue), out: make(chan Report, queue), From 54aba338a5d32784980451a2b8e01bdaeff2cc39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Fern=C3=A1ndez=20Barrera?= Date: Wed, 20 Apr 2016 15:12:34 +0200 Subject: [PATCH 4/4] Bumped version --- rbforwarder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rbforwarder.go b/rbforwarder.go index 014ea13..99346d2 100644 --- a/rbforwarder.go +++ b/rbforwarder.go @@ -9,7 +9,7 @@ import ( ) // Version is the current tag -var Version = "0.4-beta2" +var Version = "0.4-beta3" var log = logrus.New()