Skip to content

Commit

Permalink
forward: fix conflicts and accommodate ringbuffer changes from oklog#50
Browse files Browse the repository at this point in the history
  • Loading branch information
laher committed Apr 28, 2017
2 parents d6bd1f0 + f98e91b commit 8670805
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 315 deletions.
35 changes: 10 additions & 25 deletions pkg/forward/buffered_scanner.go
Expand Up @@ -14,22 +14,21 @@ type BoundedBuffer interface {
Get() string // Get should block until data is available
}

// BufferedScanner composes a boundedBuffer to make it behave akin to a Scanner
// BufferedScanner's Scan()/Text() is not synchronised (but the composed buffer is)
type BufferedScanner struct {
// bufferedScanner composes a boundedBuffer to make it behave akin to a Scanner
// bufferedScanner's Scan()/Text() is not synchronised (but the composed buffer is)
type bufferedScanner struct {
Buf BoundedBuffer
val string //temporary place to store a val after a Scan(). Not synchronised (Use case does not require it)
err error
mutex sync.RWMutex //synchronises access to err
}

func NewBufferedScanner(b BoundedBuffer) *BufferedScanner {
return &BufferedScanner{
func NewBufferedScanner(b BoundedBuffer) *bufferedScanner {
return &bufferedScanner{
Buf: b,
}
}

func (b *BufferedScanner) Consume(r io.Reader) {
func (b *bufferedScanner) Consume(r io.Reader) {
bs := bufio.NewScanner(r)
ok := bs.Scan()
for ok {
Expand All @@ -39,28 +38,14 @@ func (b *BufferedScanner) Consume(r io.Reader) {
}
if !ok {
b.mutex.Lock()
if bs.Err() != nil {
b.err = errors.Wrapf(bs.Err(), "Error reading from input")
} else {
b.err = errors.Errorf("Error reading from input")
}
b.err = errors.Errorf("Error reading from input")
b.mutex.Unlock()
}
}

func (b *BufferedScanner) Scan() bool {
b.val = b.Buf.Get()
func (b *bufferedScanner) Next() (bool, string, error) {
val := b.Buf.Get()
b.mutex.RLock()
defer b.mutex.RUnlock()
return b.err == nil
}

func (b *BufferedScanner) Text() string {
return b.val
}

func (b *BufferedScanner) Err() error {
b.mutex.RLock()
defer b.mutex.RUnlock()
return b.err
return b.err != nil, val, b.err
}
17 changes: 8 additions & 9 deletions pkg/forward/buffered_scanner_test.go
@@ -1,14 +1,13 @@
package forward

import (
"fmt"
"io"
"testing"
)

func TestBufferedScanner(t *testing.T) {
sb := NewRingBufferBCH(4)
bs := BufferedScanner{
sb := NewRingBuffer(4)
bs := bufferedScanner{
Buf: sb,
}
exp := []string{"hi", "ho", "yi", "yo"}
Expand All @@ -22,13 +21,13 @@ func TestBufferedScanner(t *testing.T) {
if i >= sb.maxSize-1 {
break
}
ok := bs.Scan()
if !ok {
t.Errorf("Buffer scan should return a value")
_, text, err := bs.Next()
if err != nil {
t.Errorf("Buffer should not return error. %v", err)
}
fmt.Println("Received", bs.Text())
if bs.Text() != e {
t.Errorf("Buffer should return a specific value [%s] but received [%s]", e, bs.Text())
t.Log("Received", text)
if text != e {
t.Errorf("Buffer should return a specific value [%s] but received [%s]", e, text)
}
}
}
31 changes: 19 additions & 12 deletions pkg/forward/forwarder.go
Expand Up @@ -34,7 +34,7 @@ type Forwarder struct {
// Messages will never be dropped.
func NewBlockingForwarder(urls []*url.URL, prefix string) *Forwarder {
textScannerFunc := func(r io.Reader) textScanner {
s := bufio.NewScanner(r)
s := &bufioScanner{bufio.NewScanner(r)}
return s
}
return &Forwarder{
Expand All @@ -52,7 +52,7 @@ func NewBlockingForwarder(urls []*url.URL, prefix string) *Forwarder {
// bufferSize refers to the maximum number of log messages (rather than e.g. bytes) in the buffer
func NewBufferedForwarder(urls []*url.URL, prefix string, bufferSize int) *Forwarder {
textScannerFunc := func(r io.Reader) textScanner {
rb := NewBufferedScanner(NewRingBufferBCH(bufferSize))
rb := NewBufferedScanner(NewRingBuffer(bufferSize))
go rb.Consume(r)
return rb
}
Expand Down Expand Up @@ -144,10 +144,9 @@ func (f *Forwarder) Forward(r io.Reader) error {
continue
}

ok := s.Scan()
ok, record, err := s.Next()
for ok {
// We enter the loop wanting to write s.Text() to the conn.
record := s.Text()
// We enter the loop wanting to write record to the conn.
if n, err := fmt.Fprintf(conn, "%s%s\n", f.Prefix, record); err != nil {
f.Disconnects.Inc()
level.Warn(logger)
Expand All @@ -163,16 +162,26 @@ func (f *Forwarder) Forward(r io.Reader) error {
backoff = 0 // reset the backoff on a successful write
f.ForwardBytes.Add(float64(len(record)) + 1)
f.ForwardRecords.Inc()
ok = s.Scan()
ok, record, err = s.Next()
}
if !ok {
level.Info(logger).Log("stdin", "exhausted", "due_to", s.Err())
level.Info(logger).Log("stdin", "exhausted", "due_to", err)
return nil
}
}

}

// bufioScanner implements a simpler API for our use-case
type bufioScanner struct {
*bufio.Scanner
}

func (bs *bufioScanner) Next() (bool, string, error) {
t := bs.Scanner.Scan()
return t, bs.Scanner.Text(), bs.Scanner.Err()
}

func exponential(d time.Duration) time.Duration {
const (
min = 16 * time.Millisecond
Expand All @@ -188,12 +197,10 @@ func exponential(d time.Duration) time.Duration {
return d
}

// textScanner models bufio.Scanner, so we can provide
// an alternate ringbuffered implementation.
// textScanner encapsulates bufio.Scanner in a single method, so we can provide
// a straightforward alternate implementation.
type textScanner interface {
Scan() bool
Text() string
Err() error
Next() (bool, string, error)
}

// Count or add several values to a given counter. This could be a prometheus Counter
Expand Down
92 changes: 18 additions & 74 deletions pkg/forward/ringbuffer.go
@@ -1,93 +1,37 @@
package forward

import (
"sync"
)

// RingBuffer is a fixed-length ring buffer. It can be used by a forwarder, to 'drop' messages instead of applying backpressure. See Issue #15
type RingBuffer struct {
type ringBuffer struct {
maxSize int

buf []string
first int // the index of the first item in the buffer
len int // the current length of the buffer
ch chan struct{} // the channel is used to block on the Get method whenever data is unavailable
mutex sync.RWMutex // synchronizes changes to buf, first, len
ch chan string // a buffered channel is used to buffer records
}

// Put() processes the record without blocking.
// It's behaviour varies depending on the state of the buffer and any blocking Get() invocations
// It either sends the record over the channel, adds it to the buffer, or drops the record if the buffer is full.
func (b *RingBuffer) Put(record string) {
b.mutex.Lock()
b.buf[b.last()] = record
if b.len >= b.maxSize {
b.inc()
} else {
b.len++
}
b.mutex.Unlock()
//notify Get method that data is available, if necessary.
select {
case b.ch <- struct{}{}:
default:
}

}

func (b *RingBuffer) inc() {
if b.first >= b.maxSize-1 {
b.first = 0
} else {
b.first++
func (b *ringBuffer) Put(record string) {
for {
select {
case b.ch <- record:
return
default:
// when buffer full, drop oldest record
<-b.ch
}
}
}

func (b *RingBuffer) last() int {
r := b.len + b.first
if r >= b.maxSize {
r -= b.maxSize
}
return r
}

// Get() blocks until data is available
func (b *RingBuffer) Get() string {
var record string
b.mutex.RLock()
if b.len < 1 {
b.mutex.RUnlock()
//just block until available
<-b.ch
} else {
b.mutex.RUnlock()
}
b.mutex.Lock()
defer b.mutex.Unlock()
record = b.buf[b.first]
b.inc()
b.len--
return record
}

// Len() is just a synchronized version of len()
func (b *RingBuffer) Len() int {
b.mutex.RLock()
defer b.mutex.RUnlock()
return b.len
func (b *ringBuffer) Get() string {
return <-b.ch
}

func NewRingBuffer(bufSize int) *RingBuffer {
if bufSize < 1 {
panic("buffer size should be greater than zero")
func NewRingBuffer(bufSize int) *ringBuffer {
if bufSize < 0 {
panic("buffer size should not be less than zero")
}
b := &RingBuffer{
b := &ringBuffer{
maxSize: bufSize,
buf: make([]string, bufSize),
mutex: sync.RWMutex{},
ch: make(chan struct{}),
first: 0,
len: 0,
ch: make(chan string, bufSize),
}
return b
}
37 changes: 0 additions & 37 deletions pkg/forward/ringbuffer_bch.go

This file was deleted.

0 comments on commit 8670805

Please sign in to comment.