Skip to content

Commit

Permalink
Modify report function.
Browse files Browse the repository at this point in the history
The new implementation does not leak reports
  • Loading branch information
Bigomby committed Apr 19, 2016
1 parent a1a7be6 commit 2c9c976
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 25 deletions.
4 changes: 1 addition & 3 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ func (b *backend) Init() {
logger.Warn("Error on produce: Full queue")
}
case <-time.After(1 * time.Second):
if err := m.Report(-1, "Error on produce: No workers available"); err != nil {
logger.Warn(err)
}
m.Report(-1, "Error on produce: No workers available")
}
}
}()
Expand Down
17 changes: 10 additions & 7 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ func (m *Message) Produce() error {
}

// Report is used by the sender to inform that a message has not been sent
func (m *Message) Report(statusCode int, status string) error {
func (m *Message) Report(statusCode int, status string) {
m.report.StatusCode = statusCode
m.report.Status = status
select {
case m.backend.reports <- m:
case <-time.After(1 * time.Second):
return errors.New("Error on report: Full queue")
}

return nil
forLoop:
for {
select {
case m.backend.reports <- m:
break forLoop
case <-time.After(500 * time.Second):
logger.Warn("Retrying report report: Full queue")
}
}
}
4 changes: 1 addition & 3 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func (tsender *TestSender) Send(m *Message) error {

select {
case tsender.channel <- string(m.OutputBuffer.Bytes()):
if err := m.Report(0, "OK"); err != nil {
log.Fatal(err)
}
m.Report(0, "OK")
}
return nil
}
Expand Down
16 changes: 4 additions & 12 deletions senders/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
if err != nil {
logger.Errorf("Error creating request: %s", err.Error())
for _, message := range batchBuffer.messages {
if err := message.Report(errRequest, err.Error()); err != nil {
logger.Error(err)
}
message.Report(errRequest, err.Error())
}
return
}
Expand All @@ -206,9 +204,7 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
res, err := s.client.Do(req)
if err != nil {
for _, message := range batchBuffer.messages {
if err := message.Report(errHTTP, err.Error()); err != nil {
logger.Error(err)
}
message.Report(errHTTP, err.Error())
}
return
}
Expand All @@ -217,15 +213,11 @@ func (s *Sender) batchSend(batchBuffer *batchBuffer, path string) {
// Send the reports
if res.StatusCode >= 400 {
for _, message := range batchBuffer.messages {
if err := message.Report(errStatus, res.Status); err != nil {
logger.Error(err)
}
message.Report(errStatus, res.Status)
}
} else {
for _, message := range batchBuffer.messages {
if err := message.Report(0, res.Status); err != nil {
logger.Error(err)
}
message.Report(0, res.Status)
}
}

Expand Down

0 comments on commit 2c9c976

Please sign in to comment.