Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 36 additions & 21 deletions flow/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,64 @@ import (
"github.com/reugn/go-streams"
)

// ThrottleMode represents Throttler's processing behavior when its element
// buffer overflows.
// ThrottleMode defines the behavior of the Throttler when its internal buffer is full.
type ThrottleMode int8

const (
// Backpressure slows down upstream ingestion when the element buffer overflows.
// Backpressure instructs the Throttler to block upstream ingestion when its internal
// buffer is full. This effectively slows down the producer, preventing data loss
// and ensuring all elements are eventually processed, albeit at a reduced rate. This
// mode can cause upstream operations to block indefinitely if the downstream consumer
// cannot keep up.
Backpressure ThrottleMode = iota

// Discard drops incoming elements when the element buffer overflows.
// Discard instructs the Throttler to drop incoming elements when its internal buffer
// is full. This mode prioritizes maintaining the target throughput rate, even at the
// cost of data loss. Elements are silently dropped without any indication to the
// upstream producer. Use this mode when data loss is acceptable.
Discard
)

// Throttler limits the throughput to a specific number of elements per time unit.
type Throttler struct {
maxElements int64
period time.Duration
mode ThrottleMode
maxElements int64
counter atomic.Int64

in chan any
out chan any
quotaSignal chan struct{}
done chan struct{}
counter int64
}

// Verify Throttler satisfies the Flow interface.
var _ streams.Flow = (*Throttler)(nil)

// NewThrottler returns a new Throttler operator.
//
// The Throttler operator limits the rate at which elements are produced. It allows a
// maximum of 'elements' number of elements to be processed within a specified 'period'
// of time.
//
// elements is the maximum number of elements to be produced per the given period of time.
// bufferSize specifies the buffer size for incoming elements.
// mode specifies the processing behavior when the elements buffer overflows.
// bufferSize is the size of the internal buffer for incoming elements. This buffer
// temporarily holds elements waiting to be processed.
// mode specifies the processing behavior when the internal elements buffer is full.
// See [ThrottleMode] for available options.
//
// If elements or bufferSize are not positive, NewThrottler will panic.
// If elements or bufferSize are not positive, or if mode is not a supported
// ThrottleMode, NewThrottler will panic.
func NewThrottler(elements int, period time.Duration, bufferSize int, mode ThrottleMode) *Throttler {
if elements < 1 {
panic(fmt.Sprintf("nonpositive elements number: %d", elements))
}
if bufferSize < 1 {
panic(fmt.Sprintf("nonpositive buffer size: %d", bufferSize))
}
if mode != Discard && mode != Backpressure {
panic(fmt.Sprintf("unsupported ThrottleMode: %d", mode))
}

throttler := &Throttler{
maxElements: int64(elements),
period: period,
Expand All @@ -66,19 +83,19 @@ func NewThrottler(elements int, period time.Duration, bufferSize int, mode Throt

// quotaExceeded checks whether the quota per time unit has been exceeded.
func (th *Throttler) quotaExceeded() bool {
return atomic.LoadInt64(&th.counter) >= th.maxElements
return th.counter.Load() >= th.maxElements
}

// resetQuotaCounterLoop resets the throttler quota counter every th.period
// and sends a release notification to the downstream processor.
// and notifies the downstream processing goroutine of the quota reset.
func (th *Throttler) resetQuotaCounterLoop() {
ticker := time.NewTicker(th.period)
defer ticker.Stop()

for {
select {
case <-ticker.C:
atomic.StoreInt64(&th.counter, 0)
th.counter.Store(0)
th.notifyQuotaReset() // send quota reset

case <-th.done:
Expand All @@ -95,8 +112,8 @@ func (th *Throttler) notifyQuotaReset() {
}
}

// buffer starts buffering incoming elements.
// If an unsupported ThrottleMode was specified, buffer will panic.
// buffer buffers incoming elements from the in channel by sending them
// to the out channel, adhering to the configured ThrottleMode.
func (th *Throttler) buffer() {
switch th.mode {
case Discard:
Expand All @@ -110,8 +127,6 @@ func (th *Throttler) buffer() {
for element := range th.in {
th.out <- element
}
default:
panic(fmt.Sprintf("Unsupported ThrottleMode: %d", th.mode))
}
close(th.out)
}
Expand Down Expand Up @@ -139,15 +154,15 @@ func (th *Throttler) In() chan<- any {
return th.in
}

// streamPortioned streams elements to the next Inlet.
// Subsequent processing of elements will be suspended when the quota limit is reached
// until the next quota reset event.
// streamPortioned streams elements to the given Inlet, enforcing a quota.
// Elements are sent to inlet.In() until th.out is closed. If the quota is exceeded,
// the function blocks until a quota reset signal is received on th.quotaSignal.
func (th *Throttler) streamPortioned(inlet streams.Inlet) {
for element := range th.out {
if th.quotaExceeded() {
<-th.quotaSignal // wait for quota reset
}
atomic.AddInt64(&th.counter, 1)
th.counter.Add(1)
inlet.In() <- element
}
close(th.done)
Expand Down
6 changes: 6 additions & 0 deletions flow/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func TestThrottler_NonPositiveBufferSize(t *testing.T) {
})
}

func TestThrottler_UnsupportedThrottleMode(t *testing.T) {
assert.Panics(t, func() {
flow.NewThrottler(2, time.Second, 1, flow.ThrottleMode(3))
})
}

func writeValues(in chan any) {
inputValues := []string{"a", "b", "c", "d", "e", "f", "g"}
ingestSlice(inputValues, in)
Expand Down
Loading