Skip to content

Commit

Permalink
Remove redundant buffering layer in HTTP log handling
Browse files Browse the repository at this point in the history
now that log parts from all processors are buffered together in the log
part sink.
  • Loading branch information
Dan Buch committed Jul 21, 2017
1 parent bd3472c commit a676e62
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 79 deletions.
2 changes: 1 addition & 1 deletion http_log_part_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type httpLogPartEncodedPayload struct {
Encoding string `json:"encoding"`
Final bool `json:"final"`
JobID uint64 `json:"job_id"`
Number int `json:"number"`
Number uint64 `json:"number"`
Token string `json:"tok"`
Type string `json:"@type"`
}
Expand Down
113 changes: 36 additions & 77 deletions http_log_writer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package worker

import (
"bytes"
"fmt"
"sync"
"time"

gocontext "context"
Expand All @@ -16,7 +14,7 @@ type httpLogPart struct {
Content string
Final bool
JobID uint64
Number int
Number uint64
Token string
}

Expand All @@ -27,9 +25,7 @@ type httpLogWriter struct {

closeChan chan struct{}

bufferMutex sync.Mutex
buffer *bytes.Buffer
logPartNumber int
logPartNumber uint64

bytesWritten int
maxLength int
Expand All @@ -46,14 +42,11 @@ func newHTTPLogWriter(ctx gocontext.Context, url string, authToken string, jobID
jobID: jobID,
authToken: authToken,
closeChan: make(chan struct{}),
buffer: new(bytes.Buffer),
timer: time.NewTimer(time.Hour),
timeout: timeout,
lps: getHTTPLogPartSinkByURL(url),
}

go writer.flushRegularly(ctx)

return writer, nil
}

Expand All @@ -67,7 +60,7 @@ func (w *httpLogWriter) Write(p []byte) (int, error) {
logger.WithFields(logrus.Fields{
"length": len(p),
"bytes": string(p),
}).Debug("writing bytes")
}).Debug("begin writing bytes")

w.timer.Reset(w.timeout)

Expand All @@ -80,9 +73,22 @@ func (w *httpLogWriter) Write(p []byte) (int, error) {
return 0, ErrWrotePastMaxLogLength
}

w.bufferMutex.Lock()
defer w.bufferMutex.Unlock()
return w.buffer.Write(p)
err := w.lps.Add(w.ctx, &httpLogPart{
Content: string(p),
JobID: w.jobID,
Number: w.logPartNumber,
Token: w.authToken,
})
if err != nil {
context.LoggerFromContext(w.ctx).WithFields(logrus.Fields{
"err": err,
"self": "http_log_writer",
}).Error("could not add log part to sink")
return 0, err
}

w.logPartNumber++
return len(p), err
}

func (w *httpLogWriter) Close() error {
Expand All @@ -93,7 +99,6 @@ func (w *httpLogWriter) Close() error {
w.timer.Stop()

close(w.closeChan)
w.flush()

err := w.lps.Add(w.ctx, &httpLogPart{
Final: true,
Expand Down Expand Up @@ -131,33 +136,38 @@ func (w *httpLogWriter) WriteAndClose(p []byte) (int, error) {

close(w.closeChan)

w.bufferMutex.Lock()
n, err := w.buffer.Write(p)
w.bufferMutex.Unlock()
err := w.lps.Add(w.ctx, &httpLogPart{
Content: string(p),
JobID: w.jobID,
Number: w.logPartNumber,
Token: w.authToken,
})

if err != nil {
return n, err
context.LoggerFromContext(w.ctx).WithFields(logrus.Fields{
"err": err,
"self": "http_log_writer",
}).Error("could not add log part to sink")
return 0, err
}
w.logPartNumber++

w.flush()

part := &httpLogPart{
err = w.lps.Add(w.ctx, &httpLogPart{
Final: true,
JobID: w.jobID,
Number: w.logPartNumber,
Token: w.authToken,
}

err = w.lps.Add(w.ctx, part)
})

if err != nil {
context.LoggerFromContext(w.ctx).WithFields(logrus.Fields{
"err": err,
"self": "http_log_writer",
}).Error("could not add log part to sink")
return n, err
return 0, err
}
w.logPartNumber++
return n, nil
return len(p), nil
}

func (w *httpLogWriter) closed() bool {
Expand All @@ -168,54 +178,3 @@ func (w *httpLogWriter) closed() bool {
return false
}
}

func (w *httpLogWriter) flushRegularly(ctx gocontext.Context) {
ticker := time.NewTicker(LogWriterTick)
defer ticker.Stop()
for {
select {
case <-w.closeChan:
return
case <-ticker.C:
w.flush()
case <-ctx.Done():
return
}
}
}

func (w *httpLogWriter) flush() {
if w.buffer.Len() <= 0 {
return
}

buf := make([]byte, LogChunkSize)

for w.buffer.Len() > 0 {
w.bufferMutex.Lock()
n, err := w.buffer.Read(buf)
w.bufferMutex.Unlock()
if err != nil {
// According to documentation, err should only be non-nil if
// there's no data in the buffer. We've checked for this, so
// this means that err should never be nil. Something is very
// wrong if this happens, so let's abort!
panic("non-empty buffer shouldn't return an error on Read")
}

err = w.lps.Add(w.ctx, &httpLogPart{
Content: string(buf[0:n]),
JobID: w.jobID,
Number: w.logPartNumber,
Token: w.authToken,
})
if err != nil {
context.LoggerFromContext(w.ctx).WithFields(logrus.Fields{
"err": err,
"self": "http_log_writer",
}).Error("could not add log part to sink")
return
}
w.logPartNumber++
}
}
1 change: 0 additions & 1 deletion http_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestHTTPLogWriter_Write(t *testing.T) {
assert.NotNil(t, hlw)
n, err := hlw.Write([]byte("it's a hot one out there"))
assert.Nil(t, err)
assert.True(t, hlw.buffer.Len() > 0)
assert.True(t, n > 0)
}

Expand Down

0 comments on commit a676e62

Please sign in to comment.