Skip to content

Commit

Permalink
kawa: fix stuff found debugging reveald
Browse files Browse the repository at this point in the history
  • Loading branch information
abraithwaite committed May 14, 2024
1 parent bfde06c commit e7d4f88
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
4 changes: 4 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"sync"

"go.opentelemetry.io/otel"
Expand All @@ -23,6 +24,7 @@ type Processor[T1, T2 any] struct {
parallelism int
tracing bool
metrics bool
count int
}

type Config[T1, T2 any] struct {
Expand Down Expand Up @@ -102,6 +104,7 @@ func (p *Processor[T1, T2]) handle(ctx context.Context) error {
if err != nil {
return fmt.Errorf("handler: %w", err)
}
p.count++
hdlSpan.End()

sctx, sendSpan := tracer.Start(ctx, "kawa.processor.dst.send")
Expand Down Expand Up @@ -152,6 +155,7 @@ func (p *Processor[T1, T2]) Run(ctx context.Context) error {
}
// Stop all the workers on shutdown.
cancel()
slog.Info("shutting down processor", "count", p.count)
// TODO: capture errors thrown during shutdown? if we do this, write local
// err first. it represents first seen
wg.Wait()
Expand Down
35 changes: 29 additions & 6 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batch

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -30,6 +31,10 @@ type Destination[T any] struct {

messages chan msgAck[T]
buf []msgAck[T]

count int
running bool
syncMu sync.Mutex
}

type OptFunc func(*Opts)
Expand Down Expand Up @@ -123,12 +128,21 @@ func (d *Destination[T]) Run(ctx context.Context) error {
epochC := make(chan uint64)
setTimer := true

d.syncMu.Lock()
if d.running {
panic("already running")
} else {
d.running = true
}
d.syncMu.Unlock()

ctx, cancel := context.WithCancel(ctx)

loop:
for {
select {
case msg := <-d.messages:
d.count++
if setTimer {
// copy the epoch to send on the chan after the timer fires
epc := epoch
Expand All @@ -139,27 +153,34 @@ loop:
}
d.buf = append(d.buf, msg)
if len(d.buf) >= d.flushlen {
slog.Info("flushing on size", "len", len(d.buf), "processed", d.count)
epoch++
err = d.flush(ctx)
if err != nil {
slog.Error("flush error", "error", err)
break loop
}
setTimer = true
epoch++
}
case tEpoch := <-epochC:
// if we haven't flushed yet this epoch, then flush, otherwise ignore
if tEpoch == epoch {
epoch++
slog.Info("flushing on epoch", "len", len(d.buf), "processed", d.count, "epoch", epoch)
err = d.flush(ctx)
if err != nil {
break loop
}
setTimer = true
epoch++
}
case err = <-d.flusherr:
slog.Info("flush error", "error", err)
break loop
case <-ctx.Done():
err = ctx.Err()
slog.Info("context done", "err", ctx.Err(), "len", len(d.buf), "processed", d.count)
if len(d.buf) > 0 {
err = d.flush(ctx)
}
break loop
}
}
Expand All @@ -180,17 +201,19 @@ func (d *Destination[T]) flush(ctx context.Context) error {
// Have to make a copy so these don't get overwritten
msgs := make([]msgAck[T], len(d.buf))
copy(msgs, d.buf)
slog.Info(fmt.Sprintf("copied %d messages for flush", len(msgs)))
// This must happen in the same goroutine as flushwg.Wait
// do not push down into doflush
d.flushwg.Add(1)
go d.doflush(ctx, msgs)
// Clear the buffer for the next batch
d.buf = d.buf[:0]
case err := <-d.flusherr:
slog.Error("flush error", "error", err)
return err
case <-ctx.Done():
// TODO: one more flush?
return ctx.Err()
// case <-ctx.Done():
// // TODO: one more flush?
// return ctx.Err()
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions x/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (s *Scanner) recvLoop(ctx context.Context) error {
}
}

if err := s.scanner.Err(); err != nil {
return fmt.Errorf("scanning: %+w", err)
}

c := make(chan struct{})
go func() {
wg.Wait()
Expand All @@ -71,10 +75,6 @@ func (s *Scanner) recvLoop(ctx context.Context) error {
return ctx.Err()
}

if err := s.scanner.Err(); err != nil {
return fmt.Errorf("scanning: %+w", err)
}

return nil
}

Expand Down

0 comments on commit e7d4f88

Please sign in to comment.