Skip to content

Commit

Permalink
Merge pull request #40 from runreveal/alan/cleanup
Browse files Browse the repository at this point in the history
kawa: fix stuff found debugging reveald
  • Loading branch information
abraithwaite committed May 14, 2024
2 parents b843f40 + 64bd79f commit af8e7cd
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 80 deletions.
4 changes: 4 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"sync"

"go.opentelemetry.io/otel"
Expand All @@ -23,6 +24,7 @@ type Processor[T1, T2 any] struct {
parallelism int
tracing bool
metrics bool
count int
}

type Config[T1, T2 any] struct {
Expand Down Expand Up @@ -102,6 +104,7 @@ func (p *Processor[T1, T2]) handle(ctx context.Context) error {
if err != nil {
return fmt.Errorf("handler: %w", err)
}
p.count++
hdlSpan.End()

sctx, sendSpan := tracer.Start(ctx, "kawa.processor.dst.send")
Expand Down Expand Up @@ -152,6 +155,7 @@ func (p *Processor[T1, T2]) Run(ctx context.Context) error {
}
// Stop all the workers on shutdown.
cancel()
slog.Info("shutting down processor", "count", p.count)
// TODO: capture errors thrown during shutdown? if we do this, write local
// err first. it represents first seen
wg.Wait()
Expand Down
182 changes: 121 additions & 61 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batch

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -20,16 +21,28 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error {
return ff(c, msgs)
}

// Destination is a batching destination that will buffer messages until the
// FlushLength limit is reached or the FlushFrequency timer fires, whichever
// comes first.
//
// `Destination.Run` must be called after calling `New` before events will be
// processed in this destination. Not calling `Run` will likely end in a
// deadlock as the internal channel being written to by `Send` will not be
// getting read.
type Destination[T any] struct {
flusher Flusher[T]
flushq chan struct{}
flushlen int
flushfreq time.Duration
flusherr chan error
flushwg *sync.WaitGroup
flusher Flusher[T]
flushq chan func()
flushlen int
flushfreq time.Duration
flusherr chan error
stopTimeout time.Duration

messages chan msgAck[T]
buf []msgAck[T]

count int
running bool
syncMu sync.Mutex
}

type OptFunc func(*Opts)
Expand All @@ -38,6 +51,7 @@ type Opts struct {
FlushLength int
FlushFrequency time.Duration
FlushParallelism int
StopTimeout time.Duration
}

func FlushFrequency(d time.Duration) func(*Opts) {
Expand All @@ -58,30 +72,40 @@ func FlushParallelism(n int) func(*Opts) {
}
}

// NewDestination instantiates a new batcher. `Destination.Run` must be called
// after calling `New` before events will be processed in this destination. Not
// calling `Run` will likely end in a deadlock as the internal channel being
// written to by `Send` will not be getting read.
func StopTimeout(d time.Duration) func(*Opts) {
return func(opts *Opts) {
opts.StopTimeout = d
}
}

// NewDestination instantiates a new batcher.
func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
cfg := Opts{
FlushLength: 100,
FlushFrequency: 1 * time.Second,
FlushParallelism: 2,
StopTimeout: 5 * time.Second,
}

for _, o := range opts {
o(&cfg)
}

// TODO: validate here
if cfg.FlushParallelism < 1 {
panic("FlushParallelism must be greater than or equal to 1")
}
if cfg.StopTimeout < 0 {
cfg.StopTimeout = 0
}

return &Destination[T]{
flushlen: cfg.FlushLength,
flushq: make(chan struct{}, cfg.FlushParallelism),
flusherr: make(chan error, cfg.FlushParallelism),
flusher: f,
flushwg: &sync.WaitGroup{},
flushfreq: cfg.FlushFrequency,
flushlen: cfg.FlushLength,
flushq: make(chan func(), cfg.FlushParallelism),
flusherr: make(chan error, cfg.FlushParallelism),
flusher: f,
flushfreq: cfg.FlushFrequency,
stopTimeout: cfg.StopTimeout,

messages: make(chan msgAck[T]),
}
Expand Down Expand Up @@ -117,18 +141,30 @@ func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Mess
return nil
}

// Run starts the batching destination. It must be called before messages will
// be processed and written to the underlying Flusher.
// Run will block until the context is canceled.
// Upon cancellation, Run will flush any remaining messages in the buffer and
// return any flush errors that occur
func (d *Destination[T]) Run(ctx context.Context) error {
var err error
var epoch uint64
epochC := make(chan uint64)
setTimer := true

ctx, cancel := context.WithCancel(ctx)
d.syncMu.Lock()
if d.running {
panic("already running")
} else {
d.running = true
}
d.syncMu.Unlock()

loop:
for {
select {
case msg := <-d.messages:
d.count++
if setTimer {
// copy the epoch to send on the chan after the timer fires
epc := epoch
Expand All @@ -139,60 +175,96 @@ loop:
}
d.buf = append(d.buf, msg)
if len(d.buf) >= d.flushlen {
err = d.flush(ctx)
if err != nil {
break loop
}
setTimer = true
epoch++
d.flush(ctx)
setTimer = true
}
case tEpoch := <-epochC:
// if we haven't flushed yet this epoch, then flush, otherwise ignore
if tEpoch == epoch {
err = d.flush(ctx)
if err != nil {
break loop
}
setTimer = true
epoch++
d.flush(ctx)
setTimer = true
}
case err = <-d.flusherr:
slog.Info("flush error", "error", err)
break loop

case <-ctx.Done():
err = ctx.Err()
slog.Info("batcher: context canceled. waiting for flushes to finish.", "len", len(d.flushq))
if len(d.buf) > 0 {
// optimistic final flush
go d.flush(context.Background())
}
break loop
}
}

cancel()
if len(d.flushq) == 0 {
return err
}

// Wait for in-flight flushes to finish
// This must happen in the same goroutine as flushwg.Add
d.flushwg.Wait()
timeout:
for {
// Wait for flushes to finish
select {
case <-time.After(d.stopTimeout):
slog.Error("batcher: timed out waiting for flushes. cancelling them.")
break timeout
case err = <-d.flusherr:
if err != nil {
slog.Info("flush error", "error", err)
return err
}
}
}
if len(d.flushq) == 0 {
return err
}

drain:
// Stop active flushes
for {
select {
case cncl := <-d.flushq:
cncl()
default:
break drain
}
}
err = errDeadlock
return err
}

func (d *Destination[T]) flush(ctx context.Context) error {
var errDeadlock = errors.New("batcher: flushes timed out waiting for completion after context stopped.")

func (d *Destination[T]) flush(ctx context.Context) {
// We make a new context here so that we can cancel the flush if the parent
// context is canceled. It's important to use context.Background() here because
// we don't want to propagate the parent context's cancelation to the flusher.
// If we did, then the flusher would likely be canceled before it could
// finish flushing.
flctx, cancel := context.WithCancel(context.Background())
// block until a slot is available, or until a timeout is reached in the
// parent context
select {
// Acquire flush slot
case d.flushq <- struct{}{}:
// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
// This must happen in the same goroutine as flushwg.Wait
// do not push down into doflush
d.flushwg.Add(1)
go d.doflush(ctx, msgs)
// Clear the buffer for the next batch
d.buf = d.buf[:0]
case err := <-d.flusherr:
return err
case d.flushq <- cancel:
case <-ctx.Done():
// TODO: one more flush?
return ctx.Err()
cancel()
return
}
return nil
// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
go func() {
d.doflush(flctx, msgs)
// clear flush slot
// this will block forever if we're shutting down
cncl := <-d.flushq
cncl()
}()
// Clear the buffer for the next batch
d.buf = d.buf[:0]
}

func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
Expand All @@ -208,7 +280,6 @@ func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
}
if err != nil {
d.flusherr <- err
d.flushwg.Done()
return
}

Expand All @@ -217,17 +288,6 @@ func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
m.ack()
}
}

// free waitgroup before clearing slot in queue to allow shutdown to proceed
// before another flush starts if a shutdown is currently happening
d.flushwg.Done()
select {
// clear flush slot
case <-d.flushq:
default:
// this should be unreachable since we're the only reader
panic("read of empty flushq")
}
}

// only call ack on last message acknowledgement
Expand Down
Loading

0 comments on commit af8e7cd

Please sign in to comment.