From e7d4f8837fb7401b9793b6ebac5867de8602c669 Mon Sep 17 00:00:00 2001 From: Alan Braithwaite Date: Mon, 13 May 2024 20:46:25 -0700 Subject: [PATCH] kawa: fix stuff found debugging reveald --- processor.go | 4 ++++ x/batcher/batcher.go | 35 +++++++++++++++++++++++++++++------ x/scanner/scanner.go | 8 ++++---- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/processor.go b/processor.go index bbcd330..b44430f 100644 --- a/processor.go +++ b/processor.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "sync" "go.opentelemetry.io/otel" @@ -23,6 +24,7 @@ type Processor[T1, T2 any] struct { parallelism int tracing bool metrics bool + count int } type Config[T1, T2 any] struct { @@ -102,6 +104,7 @@ func (p *Processor[T1, T2]) handle(ctx context.Context) error { if err != nil { return fmt.Errorf("handler: %w", err) } + p.count++ hdlSpan.End() sctx, sendSpan := tracer.Start(ctx, "kawa.processor.dst.send") @@ -152,6 +155,7 @@ func (p *Processor[T1, T2]) Run(ctx context.Context) error { } // Stop all the workers on shutdown. cancel() + slog.Info("shutting down processor", "count", p.count) // TODO: capture errors thrown during shutdown? if we do this, write local // err first. it represents first seen wg.Wait() diff --git a/x/batcher/batcher.go b/x/batcher/batcher.go index ea68e92..197eeae 100644 --- a/x/batcher/batcher.go +++ b/x/batcher/batcher.go @@ -2,6 +2,7 @@ package batch import ( "context" + "fmt" "sync" "time" @@ -30,6 +31,10 @@ type Destination[T any] struct { messages chan msgAck[T] buf []msgAck[T] + + count int + running bool + syncMu sync.Mutex } type OptFunc func(*Opts) @@ -123,12 +128,21 @@ func (d *Destination[T]) Run(ctx context.Context) error { epochC := make(chan uint64) setTimer := true + d.syncMu.Lock() + if d.running { + panic("already running") + } else { + d.running = true + } + d.syncMu.Unlock() + ctx, cancel := context.WithCancel(ctx) loop: for { select { case msg := <-d.messages: + d.count++ if setTimer { // copy the epoch to send on the chan after the timer fires epc := epoch @@ -139,27 +153,34 @@ loop: } d.buf = append(d.buf, msg) if len(d.buf) >= d.flushlen { + slog.Info("flushing on size", "len", len(d.buf), "processed", d.count) + epoch++ err = d.flush(ctx) if err != nil { + slog.Error("flush error", "error", err) break loop } setTimer = true - epoch++ } case tEpoch := <-epochC: // if we haven't flushed yet this epoch, then flush, otherwise ignore if tEpoch == epoch { + epoch++ + slog.Info("flushing on epoch", "len", len(d.buf), "processed", d.count, "epoch", epoch) err = d.flush(ctx) if err != nil { break loop } setTimer = true - epoch++ } case err = <-d.flusherr: + slog.Info("flush error", "error", err) break loop case <-ctx.Done(): - err = ctx.Err() + slog.Info("context done", "err", ctx.Err(), "len", len(d.buf), "processed", d.count) + if len(d.buf) > 0 { + err = d.flush(ctx) + } break loop } } @@ -180,6 +201,7 @@ func (d *Destination[T]) flush(ctx context.Context) error { // Have to make a copy so these don't get overwritten msgs := make([]msgAck[T], len(d.buf)) copy(msgs, d.buf) + slog.Info(fmt.Sprintf("copied %d messages for flush", len(msgs))) // This must happen in the same goroutine as flushwg.Wait // do not push down into doflush d.flushwg.Add(1) @@ -187,10 +209,11 @@ func (d *Destination[T]) flush(ctx context.Context) error { // Clear the buffer for the next batch d.buf = d.buf[:0] case err := <-d.flusherr: + slog.Error("flush error", "error", err) return err - case <-ctx.Done(): - // TODO: one more flush? - return ctx.Err() + // case <-ctx.Done(): + // // TODO: one more flush? + // return ctx.Err() } return nil } diff --git a/x/scanner/scanner.go b/x/scanner/scanner.go index 9776225..2f9b079 100644 --- a/x/scanner/scanner.go +++ b/x/scanner/scanner.go @@ -59,6 +59,10 @@ func (s *Scanner) recvLoop(ctx context.Context) error { } } + if err := s.scanner.Err(); err != nil { + return fmt.Errorf("scanning: %+w", err) + } + c := make(chan struct{}) go func() { wg.Wait() @@ -71,10 +75,6 @@ func (s *Scanner) recvLoop(ctx context.Context) error { return ctx.Err() } - if err := s.scanner.Err(); err != nil { - return fmt.Errorf("scanning: %+w", err) - } - return nil }