Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kawa: fix stuff found debugging reveald #40

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"sync"

"go.opentelemetry.io/otel"
Expand All @@ -23,6 +24,7 @@ type Processor[T1, T2 any] struct {
parallelism int
tracing bool
metrics bool
count int
}

type Config[T1, T2 any] struct {
Expand Down Expand Up @@ -102,6 +104,7 @@ func (p *Processor[T1, T2]) handle(ctx context.Context) error {
if err != nil {
return fmt.Errorf("handler: %w", err)
}
p.count++
hdlSpan.End()

sctx, sendSpan := tracer.Start(ctx, "kawa.processor.dst.send")
Expand Down Expand Up @@ -152,6 +155,7 @@ func (p *Processor[T1, T2]) Run(ctx context.Context) error {
}
// Stop all the workers on shutdown.
cancel()
slog.Info("shutting down processor", "count", p.count)
// TODO: capture errors thrown during shutdown? if we do this, write local
// err first. it represents first seen
wg.Wait()
Expand Down
182 changes: 121 additions & 61 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batch

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -20,16 +21,28 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error {
return ff(c, msgs)
}

// Destination is a batching destination that will buffer messages until the
// FlushLength limit is reached or the FlushFrequency timer fires, whichever
// comes first.
//
// `Destination.Run` must be called after calling `New` before events will be
// processed in this destination. Not calling `Run` will likely end in a
// deadlock as the internal channel being written to by `Send` will not be
// getting read.
type Destination[T any] struct {
flusher Flusher[T]
flushq chan struct{}
flushlen int
flushfreq time.Duration
flusherr chan error
flushwg *sync.WaitGroup
flusher Flusher[T]
flushq chan func()
flushlen int
flushfreq time.Duration
flusherr chan error
stopTimeout time.Duration

messages chan msgAck[T]
buf []msgAck[T]

count int
running bool
syncMu sync.Mutex
}

type OptFunc func(*Opts)
Expand All @@ -38,6 +51,7 @@ type Opts struct {
FlushLength int
FlushFrequency time.Duration
FlushParallelism int
StopTimeout time.Duration
}

func FlushFrequency(d time.Duration) func(*Opts) {
Expand All @@ -58,30 +72,40 @@ func FlushParallelism(n int) func(*Opts) {
}
}

// NewDestination instantiates a new batcher. `Destination.Run` must be called
// after calling `New` before events will be processed in this destination. Not
// calling `Run` will likely end in a deadlock as the internal channel being
// written to by `Send` will not be getting read.
func StopTimeout(d time.Duration) func(*Opts) {
return func(opts *Opts) {
opts.StopTimeout = d
}
}

// NewDestination instantiates a new batcher.
func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
cfg := Opts{
FlushLength: 100,
FlushFrequency: 1 * time.Second,
FlushParallelism: 2,
StopTimeout: 5 * time.Second,
}

for _, o := range opts {
o(&cfg)
}

// TODO: validate here
if cfg.FlushParallelism < 1 {
panic("FlushParallelism must be greater than or equal to 1")
}
if cfg.StopTimeout < 0 {
cfg.StopTimeout = 0
}

return &Destination[T]{
flushlen: cfg.FlushLength,
flushq: make(chan struct{}, cfg.FlushParallelism),
flusherr: make(chan error, cfg.FlushParallelism),
flusher: f,
flushwg: &sync.WaitGroup{},
flushfreq: cfg.FlushFrequency,
flushlen: cfg.FlushLength,
flushq: make(chan func(), cfg.FlushParallelism),
flusherr: make(chan error, cfg.FlushParallelism),
flusher: f,
flushfreq: cfg.FlushFrequency,
stopTimeout: cfg.StopTimeout,

messages: make(chan msgAck[T]),
}
Expand Down Expand Up @@ -117,18 +141,30 @@ func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Mess
return nil
}

// Run starts the batching destination. It must be called before messages will
// be processed and written to the underlying Flusher.
// Run will block until the context is canceled.
// Upon cancellation, Run will flush any remaining messages in the buffer and
// return any flush errors that occur
func (d *Destination[T]) Run(ctx context.Context) error {
var err error
var epoch uint64
epochC := make(chan uint64)
setTimer := true

ctx, cancel := context.WithCancel(ctx)
d.syncMu.Lock()
if d.running {
panic("already running")
} else {
d.running = true
}
d.syncMu.Unlock()

loop:
for {
select {
case msg := <-d.messages:
d.count++
if setTimer {
// copy the epoch to send on the chan after the timer fires
epc := epoch
Expand All @@ -139,60 +175,96 @@ loop:
}
d.buf = append(d.buf, msg)
if len(d.buf) >= d.flushlen {
err = d.flush(ctx)
if err != nil {
break loop
}
setTimer = true
epoch++
d.flush(ctx)
setTimer = true
}
case tEpoch := <-epochC:
// if we haven't flushed yet this epoch, then flush, otherwise ignore
if tEpoch == epoch {
err = d.flush(ctx)
if err != nil {
break loop
}
setTimer = true
epoch++
d.flush(ctx)
setTimer = true
}
case err = <-d.flusherr:
slog.Info("flush error", "error", err)
break loop

case <-ctx.Done():
err = ctx.Err()
slog.Info("batcher: context canceled. waiting for flushes to finish.", "len", len(d.flushq))
if len(d.buf) > 0 {
// optimistic final flush
go d.flush(context.Background())
}
break loop
}
}

cancel()
if len(d.flushq) == 0 {
return err
}

// Wait for in-flight flushes to finish
// This must happen in the same goroutine as flushwg.Add
d.flushwg.Wait()
timeout:
for {
// Wait for flushes to finish
select {
case <-time.After(d.stopTimeout):
slog.Error("batcher: timed out waiting for flushes. cancelling them.")
break timeout
case err = <-d.flusherr:
if err != nil {
slog.Info("flush error", "error", err)
return err
}
}
}
if len(d.flushq) == 0 {
return err
}

drain:
// Stop active flushes
for {
select {
case cncl := <-d.flushq:
cncl()
default:
break drain
}
}
err = errDeadlock
return err
}

func (d *Destination[T]) flush(ctx context.Context) error {
var errDeadlock = errors.New("batcher: flushes timed out waiting for completion after context stopped.")

func (d *Destination[T]) flush(ctx context.Context) {
// We make a new context here so that we can cancel the flush if the parent
// context is canceled. It's important to use context.Background() here because
// we don't want to propagate the parent context's cancelation to the flusher.
// If we did, then the flusher would likely be canceled before it could
// finish flushing.
flctx, cancel := context.WithCancel(context.Background())
// block until a slot is available, or until a timeout is reached in the
// parent context
select {
// Acquire flush slot
case d.flushq <- struct{}{}:
// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
// This must happen in the same goroutine as flushwg.Wait
// do not push down into doflush
d.flushwg.Add(1)
go d.doflush(ctx, msgs)
// Clear the buffer for the next batch
d.buf = d.buf[:0]
case err := <-d.flusherr:
return err
case d.flushq <- cancel:
case <-ctx.Done():
// TODO: one more flush?
return ctx.Err()
cancel()
return
}
return nil
// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
go func() {
d.doflush(flctx, msgs)
// clear flush slot
// this will block forever if we're shutting down
cncl := <-d.flushq
cncl()
}()
// Clear the buffer for the next batch
d.buf = d.buf[:0]
}

func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
Expand All @@ -208,7 +280,6 @@ func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
}
if err != nil {
d.flusherr <- err
d.flushwg.Done()
return
}

Expand All @@ -217,17 +288,6 @@ func (d *Destination[T]) doflush(ctx context.Context, msgs []msgAck[T]) {
m.ack()
}
}

// free waitgroup before clearing slot in queue to allow shutdown to proceed
// before another flush starts if a shutdown is currently happening
d.flushwg.Done()
select {
// clear flush slot
case <-d.flushq:
default:
// this should be unreachable since we're the only reader
panic("read of empty flushq")
}
}

// only call ack on last message acknowledgement
Expand Down
Loading
Loading