Skip to content
This repository has been archived by the owner on Sep 8, 2018. It is now read-only.

Buffered forwarder for #15 #50

Closed
wants to merge 26 commits into from

Conversation

laher
Copy link

@laher laher commented Mar 27, 2017

@peterbourgon please see this experimental implementation for #15 , although it's a bit racey for the time being. Please don't consider this complete yet.

  • It basically works - when I kill my ingeststore the forwarder buffers some messages, and then after restarting the ingeststore it reconnects & forwards the buffer 👍 ... but sometimes a couple of messages get sent twice - it needs work still.
  • I've been testing it by piping date once per second onto oklog forward -buf -bufsize=5 localhost, then repeatedly querying oklog query -from 30s in another window, and then killing/restarting my ingeststore for 5-15 seconds. It's pretty easy to see what's going on.
  • For the buffer I tried using container/ring from the standard library. I added a mutex and a couple of other fields to maintain state. Seems OKish, but not very simple really. Any advice on how you envisioned this?
  • I chose messages as the unit for buffering, rather than bytes. I figured it fits the forwarding code reasonably well.

It's late now so I just thought I'd elicit some feedback for now, as you may have had something very different in mind.
Cheers

@laher
Copy link
Author

laher commented Mar 29, 2017

OK did a bit more this morning, and I'm happier with the approach now - some work still needed, but it's working nicely and the approach feels more sane. The ring buffer now implements a textScanner interface, to act as a drop-in replacement for the bufio.Scanner.

I'll report back once I've got it 100%.

Copy link
Member

@peterbourgon peterbourgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some relatively straightforward changes to the calling scope, but I think the ringbuf is not usable as-is.

apiAddr = flagset.String("api", "", "listen address for forward API (and metrics)")
prefixes = stringslice{}
ringBuf = flagset.Bool("buf", false, "use a ring buffer to drop old messages when e.g. the ingestion engine fails")
ringBufSize = flagset.Int("bufsize", 1024, "ring buffer size (messages)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would want the flag to describe the forwarder backpressure strategy (the user intent) rather than a lower-level detail like enabling a ringbuffer (the implementation detail). How about...

backpressure = flagset.String("backpressure", "block", "block, buffer")
bufferSize   = flagset.Int("buffer-size", 1024, "in -backpressure=buffer mode, ringbuffer size in records")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's important to validate the string when you use it down below, e.g.

var s textScanner
switch strings.ToLower(*backpressure) {
case "block":
    s = bufio.NewScanner(os.Stdin)
case "buffer":
    // e.g. from below
    rb := newRingBuffer(*bufferSize)
    go rb.Consume(os.Stdin)
    s = rb
default:
    level.Error(logger).Log("backpressure", *backpressure, "err", "invalid backpressure option")
    os.Exit(1)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, sure. Sounds like you'd like to keep options open for other alternatives in future.

Scan() bool
Text() string
Err() error
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define this at the point of use, i.e. in func main.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please comment that e.g.

// textScanner models bufio.Scanner, so we can provide
// an alternate ringbuffered implementation.

level.Info(logger).Log("stdin", "exhausted", "due_to", bufioScanner.Err())
}
}
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making this code a method on the ringBuffer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the callsite would look like

rb := newRingBuffer(*bufferSize)
go rb.Consume(os.Stdin)
s = rb

rb := newRingBuffer(*ringBufSize)
go func() {
for {
bufioScanner := bufio.NewScanner(os.Stdin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bs := bufio.NewScanner(os.Stdin)

ok = bufioScanner.Scan()
}
if !ok {
level.Info(logger).Log("stdin", "exhausted", "due_to", bufioScanner.Err())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should use this error from stdin to shut down the ringbuffer somehow. That shutdown signal should propagate thru to terminate the existing for ok loop.


func (bf *ringBuffer) Scan() bool {
return true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, we're gonna want to propagate a shutdown signal thru somehow to terminate this.

bf.r = bf.r.Next()
bf.inc(-1)
return v
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm really misunderstanding something, this method has a number of problems…

  • It's burning the CPU calling bf.Remaining over and over
  • Once that loop exits, the bf.r.Value is not guaranteed to be there, another goroutine calling Get could have stolen the work item — a data race
  • bf.r is written without synchronization — a data race

I think using a remaining counter as the synchronization point is not really viable, at least not as implemented here...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It's not great. I'm not loving container/ring TBH. I'd like to grok it better with some more investigation into the Link/Unlink methods, but I'm also thinking about alternatives.

I'd like to try a buffered channel implementation (2 channels, similar to this https://content.pivotal.io/blog/a-channel-based-ring-buffer-in-go ). It would be safe from races but I'm not sure how performance would compare to locking approaches. I guess a benchmark would help confirm or otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the comments on that Pivotal buffer, it's racy as printed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll see if I can make container/ring work without the counter and then look at other options from there. Cheers

@laher
Copy link
Author

laher commented Mar 31, 2017

I've applied the straightforward changes, thanks for your guidance. I'll work on the ring buffer itself in due course and add appropriate tests/benchmarks. Cheers

@laher
Copy link
Author

laher commented Apr 2, 2017

@peterbourgon I've rewritten the buffer now, so that Get() blocks on a channel whilst data is unavailable.

So, I tried starting from scratch: 2 new struct types, composed together with an interface, and backed by a slice and a channel.

  • bufferedScanner is forwarding messages to a 'buffer' type, implementing some methods which are shared with io.Scanner.
  • sliceBuffer is the only implementation that I've added.
  • sliceBuffer.Get() blocks on a channel whenever the buffer is empty, and no data is available.
  • sliceBuffer.Put() attempts to deliver messages on the channel. If the channel is not being listened to, Put adds the messages onto a bounded buffer (a slice). Once the maximum size is reached, further messages are discarded.

Hoping this change is closer to the mark.
Thanks

}
b.mutex.RUnlock()
b.mutex.Lock()
record, b.buf = b.buf[0], b.buf[1:]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be expensive in terms of GC.

Happy to switch out the slice for a container/list.List or ring.Ring instead. Incidentally I was experimenting with a list.List, found with a benchmark that the slice was marginally faster, but maybe GC is more important.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think maybe the best bet is to keep it as a slice but to use it like a ring-buffer to reduce the number of allocations. I'll have a play with it and try some -memprofile analysis

func (b *sliceBuffer) Put(record string) {
select {
case b.ch <- record:
default:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used the select/default on the send side only, for one-way blocking. We always want Get to wait for data, but we want Put to buffer instead.

@laher laher changed the title Buffered forwarder, using container.Ring() for #15 Buffered forwarder for #15 Apr 3, 2017
@laher laher changed the title Buffered forwarder for #15 [WIP] Buffered forwarder for #15 Apr 15, 2017
@laher
Copy link
Author

laher commented Apr 15, 2017

Please see #62 for a proposed direction I'd like to take this feature.

(If 62 gets rejected, perhaps we can still come back to this)

@laher laher changed the title [WIP] Buffered forwarder for #15 Buffered forwarder for #15 Apr 28, 2017
@laher
Copy link
Author

laher commented Apr 28, 2017

OK I think I'm done with this buffered-channel implementation of the ring buffer. I think it's nice and simple.

I fancy you'll have some more thoughts on the bufferedScanner. I have amended the API a little to make it a little simpler IMO. Just one call to Next() instead of Scan()/Text()/Error(). It makes more sense to me.

Still, I'm good for now. I'll go over and close the other PR. Cheers.

laher added a commit to laher/oklog that referenced this pull request Apr 28, 2017
@laher
Copy link
Author

laher commented May 23, 2017

@peterbourgon have you had a chance to look at the most recent implementation with the buffered channel?

Ta

@peterbourgon
Copy link
Member

Not yet, sorry! I will find some time on the weekend 🤞

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants