Skip to content

Commit

Permalink
Merge pull request #44 from runreveal/alan/minor-tweaks
Browse files Browse the repository at this point in the history
tweaks to logging, dont shadow err
  • Loading branch information
abraithwaite committed May 31, 2024
2 parents c190a38 + bcb5bb7 commit 4fa34af
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,14 @@ loop:
setTimer = true
}
case err = <-d.flusherr:
slog.Info("flush error", "error", err)
slog.Error("flush error", "error", err)
break loop

case <-ctx.Done():
slog.Info("batcher: context canceled. waiting for flushes to finish.", "len", len(d.flushq))
if len(d.buf) > 0 {
// optimistic final flush
// optimistic final flush.
// launched in a goroutine to avoid deadlock acuqiring a flushq slot
// might be better to return and not try to write this batch.
go d.flush(context.Background())
}
break loop
Expand All @@ -204,17 +205,19 @@ loop:
return err
}

slog.Info("stopping batcher. waiting for remaining flushes to finish.", "len", len(d.flushq))
timeout:
for {
// Wait for flushes to finish
select {
case <-time.After(d.stopTimeout):
slog.Error("batcher: timed out waiting for flushes. cancelling them.")
break timeout
case err = <-d.flusherr:
if err != nil {
case e2 := <-d.flusherr:
if e2 != nil {
slog.Info("flush error", "error", err)
return err
if err == nil {
err = e2
}
}
}
}
Expand All @@ -223,7 +226,8 @@ timeout:
}

drain:
// Stop active flushes
// flushes still active after timeout
// cancel them.
for {
select {
case cncl := <-d.flushq:
Expand Down Expand Up @@ -259,7 +263,6 @@ func (d *Destination[T]) flush(ctx context.Context) {
go func() {
d.doflush(flctx, msgs)
// clear flush slot
// this will block forever if we're shutting down
cncl := <-d.flushq
cncl()
}()
Expand Down

0 comments on commit 4fa34af

Please sign in to comment.