Skip to content

Commit

Permalink
Merge branch 'release/0.4-beta2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Oct 18, 2016
2 parents 24f86e2 + 4020208 commit 133f064
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 64 deletions.
8 changes: 3 additions & 5 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (b *backend) Init() {
b.encoderPool = make(chan chan *Message, b.workers)
b.senderPool = make(chan chan *Message, b.workers)

b.messages = make(chan *Message, b.queue)
b.messages = make(chan *Message)
b.input = make(chan *Message)
b.reports = make(chan *Message, b.queue)
b.reports = make(chan *Message)
b.messagePool = make(chan *Message, b.queue)

b.keepSending = make(chan struct{})
Expand Down Expand Up @@ -130,9 +130,7 @@ func (b *backend) Init() {
logger.Warn("Error on produce: Full queue")
}
case <-time.After(1 * time.Second):
if err := m.Report(-1, "Error on produce: No workers available"); err != nil {
logger.Warn(err)
}
m.Report(-1, "Error on produce: No workers available")
}
}
}()
Expand Down
30 changes: 9 additions & 21 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"sync/atomic"
"time"
)

const (
Expand All @@ -24,36 +23,25 @@ type Message struct {

// Produce is used by the source to send messages to the backend
func (m *Message) Produce() error {

backend := m.backend

// This is no a retry
if m.report.Retries == 0 {
m.report = Report{
ID: atomic.AddUint64(&m.backend.currentProducedID, 1) - 1,
Metadata: m.Metadata,
}
if !backend.active {
return errors.New("Backend closed")
}

// Send the message to the backend
if backend.active {
backend.input <- m
} else {
return errors.New("Backend closed")
m.report = Report{
ID: atomic.AddUint64(&m.backend.currentProducedID, 1) - 1,
Metadata: m.Metadata,
}

backend.input <- m

return nil
}

// Report is used by the sender to inform that a message has not been sent
func (m *Message) Report(statusCode int, status string) error {
func (m *Message) Report(statusCode int, status string) {
m.report.StatusCode = statusCode
m.report.Status = status
select {
case m.backend.reports <- m:
case <-time.After(1 * time.Second):
return errors.New("Error on report: Full queue")
}

return nil
m.backend.reports <- m
}
3 changes: 2 additions & 1 deletion rbforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// Version is the current tag
var Version = "0.4-beta"
var Version = "0.4-beta2"

// Logger for the package
var logger *logrus.Entry
Expand Down Expand Up @@ -156,5 +156,6 @@ func (f *RBForwarder) TakeMessage() (message *Message, err error) {
if !ok {
err = errors.New("Pool closed")
}

return
}
4 changes: 1 addition & 3 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func (tsender *TestSender) Send(m *Message) error {

select {
case tsender.channel <- string(m.OutputBuffer.Bytes()):
if err := m.Report(0, "OK"); err != nil {
log.Fatal(err)
}
m.Report(0, "OK")
}
return nil
}
Expand Down
25 changes: 3 additions & 22 deletions reporthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,33 +78,16 @@ func (r *reportHandler) Init() {
report.Metadata = message.Metadata

// Reset message data
message.InputBuffer.Reset()
message.OutputBuffer.Reset()
message.Data = nil
message.Metadata = make(map[string]interface{})
message.report = Report{}

// Send back the message to the pool
returnReportLoop:
for {
select {
case r.freedMessages <- message:
break returnReportLoop
case <-time.After(1 * time.Second):
logger.Warn("Can't put back the message on the pool")
}
}
r.freedMessages <- message

// Send the report to the client
sendReportLoop:
for {
select {
case r.unordered <- report:
break sendReportLoop
case <-time.After(500 * time.Millisecond):
logger.Warn("Delivering report: Full queue")
}
}
r.unordered <- report
} else {
go func() {
message.report.Retries++
Expand All @@ -116,9 +99,7 @@ func (r *reportHandler) Init() {
Warnf("Retrying message")

<-time.After(time.Duration(r.config.backoff) * time.Second)
if err := message.Produce(); err != nil {
logger.Error(err)
}
message.backend.input <- message
}()
}
}
Expand Down
16 changes: 4 additions & 12 deletions senders/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
if err != nil {
logger.Errorf("Error creating request: %s", err.Error())
for _, message := range batchBuffer.messages {
if err := message.Report(errRequest, err.Error()); err != nil {
logger.Error(err)
}
message.Report(errRequest, err.Error())
}
return
}
Expand All @@ -206,9 +204,7 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
res, err := s.client.Do(req)
if err != nil {
for _, message := range batchBuffer.messages {
if err := message.Report(errHTTP, err.Error()); err != nil {
logger.Error(err)
}
message.Report(errHTTP, err.Error())
}
return
}
Expand All @@ -217,15 +213,11 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
// Send the reports
if res.StatusCode >= 400 {
for _, message := range batchBuffer.messages {
if err := message.Report(errStatus, res.Status); err != nil {
logger.Error(err)
}
message.Report(errStatus, res.Status)
}
} else {
for _, message := range batchBuffer.messages {
if err := message.Report(0, res.Status); err != nil {
logger.Error(err)
}
message.Report(0, res.Status)
}
}

Expand Down

0 comments on commit 133f064

Please sign in to comment.