Package delaybuffer provides a generic, time-based buffer to regulate the flow of data items.
The buffer works by holding items for a minimum specified duration before releasing them to an output channel. This is useful for cushioning against unpredictable processing delays upstream, ensuring a smoother, more regulated data flow downstream.
It is safe for concurrent use.
To use the buffer, you need a data type that implements the delaybuffer.DataItem interface by providing a CreatedTime() time.Time method.
Example:
First, define your data type:
type MyItem struct {
ID string
Timestamp time.Time
}
func (i MyItem) CreatedTime() time.Time {
return i.Timestamp
}Next, create and use the buffer:
// Create an input channel and the buffer.
inChan := make(chan MyItem)
bufferDelay := 2 * time.Second
tickerInterval := 100 * time.Millisecond // Check for items every 100ms
buffer := delaybuffer.NewBuffer(inChan, bufferDelay, tickerInterval)
// Start a goroutine to listen for processed items.
go func() {
for item := range buffer.Out {
fmt.Printf("Processed item: %s\n", item.ID)
}
}()
// Send items to the buffer.
inChan <- MyItem{ID: "item-1", Timestamp: time.Now()}
// To shut down gracefully, close the input channel and call Close.
close(inChan)
buffer.Close()