Skip to content

Commit

Permalink
remove log lines, add todos
Browse files Browse the repository at this point in the history
  • Loading branch information
abraithwaite committed May 14, 2024
1 parent 29829db commit b852b08
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package batch

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -153,7 +152,6 @@ loop:
}
d.buf = append(d.buf, msg)
if len(d.buf) >= d.flushlen {
slog.Info("flushing on size", "len", len(d.buf), "processed", d.count)
epoch++
err = d.flush(ctx)
if err != nil {
Expand All @@ -166,7 +164,6 @@ loop:
// if we haven't flushed yet this epoch, then flush, otherwise ignore
if tEpoch == epoch {
epoch++
slog.Info("flushing on epoch", "len", len(d.buf), "processed", d.count, "epoch", epoch)
err = d.flush(ctx)
if err != nil {
break loop
Expand All @@ -179,6 +176,8 @@ loop:
case <-ctx.Done():
slog.Info("context done", "err", ctx.Err(), "len", len(d.buf), "processed", d.count)
if len(d.buf) > 0 {
// TODO: withTimeout
ctx := context.Background()
err = d.flush(ctx)
}
break loop
Expand All @@ -201,7 +200,6 @@ func (d *Destination[T]) flush(ctx context.Context) error {
// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
slog.Info(fmt.Sprintf("copied %d messages for flush", len(msgs)))
// This must happen in the same goroutine as flushwg.Wait
// do not push down into doflush
d.flushwg.Add(1)
Expand All @@ -211,8 +209,8 @@ func (d *Destination[T]) flush(ctx context.Context) error {
case err := <-d.flusherr:
slog.Error("flush error", "error", err)
return err
// TODO: evaluate granularly when we should catch cancelation
// case <-ctx.Done():
// // TODO: one more flush?
// return ctx.Err()
}
return nil
Expand Down

0 comments on commit b852b08

Please sign in to comment.