From b852b081caaf3b96bd65a3b01be440fe931260e4 Mon Sep 17 00:00:00 2001 From: Alan Braithwaite Date: Mon, 13 May 2024 20:50:27 -0700 Subject: [PATCH] remove log lines, add todos --- x/batcher/batcher.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/x/batcher/batcher.go b/x/batcher/batcher.go index 197eeae..b40c343 100644 --- a/x/batcher/batcher.go +++ b/x/batcher/batcher.go @@ -2,7 +2,6 @@ package batch import ( "context" - "fmt" "sync" "time" @@ -153,7 +152,6 @@ loop: } d.buf = append(d.buf, msg) if len(d.buf) >= d.flushlen { - slog.Info("flushing on size", "len", len(d.buf), "processed", d.count) epoch++ err = d.flush(ctx) if err != nil { @@ -166,7 +164,6 @@ loop: // if we haven't flushed yet this epoch, then flush, otherwise ignore if tEpoch == epoch { epoch++ - slog.Info("flushing on epoch", "len", len(d.buf), "processed", d.count, "epoch", epoch) err = d.flush(ctx) if err != nil { break loop @@ -179,6 +176,8 @@ loop: case <-ctx.Done(): slog.Info("context done", "err", ctx.Err(), "len", len(d.buf), "processed", d.count) if len(d.buf) > 0 { + // TODO: withTimeout + ctx := context.Background() err = d.flush(ctx) } break loop @@ -201,7 +200,6 @@ func (d *Destination[T]) flush(ctx context.Context) error { // Have to make a copy so these don't get overwritten msgs := make([]msgAck[T], len(d.buf)) copy(msgs, d.buf) - slog.Info(fmt.Sprintf("copied %d messages for flush", len(msgs))) // This must happen in the same goroutine as flushwg.Wait // do not push down into doflush d.flushwg.Add(1) @@ -211,8 +209,8 @@ func (d *Destination[T]) flush(ctx context.Context) error { case err := <-d.flusherr: slog.Error("flush error", "error", err) return err + // TODO: evaluate granularly when we should catch cancelation // case <-ctx.Done(): - // // TODO: one more flush? // return ctx.Err() } return nil