Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
rename lo.Batch to lo.Buffer and lo.BatchWithTImeout to lo.BufferWith…
…Timeout
  • Loading branch information
samber committed Nov 15, 2022
1 parent 686821d commit a133373
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 26 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,16 @@

@samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade.

## 1.34.1 (2022-11-xx)

Adding:
- lo.BufferWithTimeout (alias to lo.BatchWithTimeout)
- lo.Buffer (alias to lo.Batch)

Deprecation:
- lo.BatchWithTimeout
- lo.Batch

## 1.34.0 (2022-11-12)

Improving:
Expand Down
24 changes: 12 additions & 12 deletions README.md
Expand Up @@ -159,8 +159,8 @@ Supported helpers for channels:
- [ChannelDispatcher](#channeldispatcher)
- [SliceToChannel](#slicetochannel)
- [Generator](#generator)
- [Batch](#batch)
- [BatchWithTimeout](#batchwithtimeout)
- [Buffer](#buffer)
- [BufferWithTimeout](#bufferwithtimeout)
- [FanIn](#fanin)
- [FanOut](#fanout)

Expand Down Expand Up @@ -1438,16 +1438,16 @@ for v := range lo.Generator(2, generator) {
// prints 1, then 2, then 3
```

### Batch
### Buffer

Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed).

```go
ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5})

items1, length1, duration1, ok1 := lo.Batch(ch, 3)
items1, length1, duration1, ok1 := lo.Buffer(ch, 3)
// []int{1, 2, 3}, 3, 0s, true
items2, length2, duration2, ok2 := lo.Batch(ch, 3)
items2, length2, duration2, ok2 := lo.Buffer(ch, 3)
// []int{4, 5}, 2, 0s, false
```

Expand All @@ -1458,7 +1458,7 @@ ch := readFromQueue()

for {
// read 1k items
items, length, _, ok := lo.Batch(ch, 1000)
items, length, _, ok := lo.Buffer(ch, 1000)

// do batching stuff

Expand All @@ -1468,7 +1468,7 @@ for {
}
```

### BatchWithTimeout
### BufferWithTimeout

Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).

Expand All @@ -1482,11 +1482,11 @@ generator := func(yield func(int)) {

ch := lo.Generator(0, generator)

items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
items1, length1, duration1, ok1 := lo.BufferWithTimeout(ch, 3, 100*time.Millisecond)
// []int{1, 2}, 2, 100ms, true
items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
items2, length2, duration2, ok2 := lo.BufferWithTimeout(ch, 3, 100*time.Millisecond)
// []int{3, 4, 5}, 3, 75ms, true
items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
items3, length3, duration2, ok3 := lo.BufferWithTimeout(ch, 3, 100*time.Millisecond)
// []int{}, 0, 10ms, false
```

Expand All @@ -1498,7 +1498,7 @@ ch := readFromQueue()
for {
// read 1k items
// wait up to 1 second
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)
items, length, _, ok := lo.BufferWithTimeout(ch, 1000, 1*time.Second)

// do batching stuff

Expand All @@ -1521,7 +1521,7 @@ consumer := func(c <-chan int) {
for {
// read 1k items
// wait up to 1 second
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)
items, length, _, ok := lo.BufferWithTimeout(ch, 1000, 1*time.Second)

// do batching stuff

Expand Down
20 changes: 16 additions & 4 deletions channel.go
Expand Up @@ -193,9 +193,9 @@ func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T {
return ch
}

// Batch creates a slice of n elements from a channel. Returns the slice and the slice length.
// Buffer creates a slice of n elements from a channel. Returns the slice and the slice length.
// @TODO: we should probably provide an helper that reuse the same buffer.
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
func Buffer[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
buffer := make([]T, 0, size)
index := 0
now := time.Now()
Expand All @@ -212,9 +212,15 @@ func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime t
return buffer, index, time.Since(now), true
}

// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
// Buffer creates a slice of n elements from a channel. Returns the slice and the slice length.
// Deprecated: Use lo.Buffer instead.
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
return Buffer(ch, size)
}

// BufferWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
// @TODO: we should probably provide an helper that reuse the same buffer.
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
expire := time.NewTimer(timeout)
defer expire.Stop()

Expand All @@ -239,6 +245,12 @@ func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (coll
return buffer, index, time.Since(now), true
}

// BufferWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
// Deprecated: Use lo.BufferWithTimeout instead.
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
return BufferWithTimeout(ch, size, timeout)
}

// FanIn collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.
func FanIn[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
Expand Down
20 changes: 10 additions & 10 deletions channel_test.go
Expand Up @@ -257,16 +257,16 @@ func TestGenerate(t *testing.T) {
is.Equal(i, 4)
}

func TestBatch(t *testing.T) {
func TestBuffer(t *testing.T) {
t.Parallel()
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

ch := SliceToChannel(2, []int{1, 2, 3})

items1, length1, _, ok1 := Batch(ch, 2)
items2, length2, _, ok2 := Batch(ch, 2)
items3, length3, _, ok3 := Batch(ch, 2)
items1, length1, _, ok1 := Buffer(ch, 2)
items2, length2, _, ok2 := Buffer(ch, 2)
items3, length3, _, ok3 := Buffer(ch, 2)

is.Equal([]int{1, 2}, items1)
is.Equal(2, length1)
Expand All @@ -279,7 +279,7 @@ func TestBatch(t *testing.T) {
is.False(ok3)
}

func TestBatchWithTimeout(t *testing.T) {
func TestBufferWithTimeout(t *testing.T) {
t.Parallel()
testWithTimeout(t, 200*time.Millisecond)
is := assert.New(t)
Expand All @@ -292,27 +292,27 @@ func TestBatchWithTimeout(t *testing.T) {
}
ch := Generator(0, generator)

items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond)
items1, length1, _, ok1 := BufferWithTimeout(ch, 20, 15*time.Millisecond)
is.Equal([]int{0, 1}, items1)
is.Equal(2, length1)
is.True(ok1)

items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond)
items2, length2, _, ok2 := BufferWithTimeout(ch, 20, 2*time.Millisecond)
is.Equal([]int{}, items2)
is.Equal(0, length2)
is.True(ok2)

items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond)
items3, length3, _, ok3 := BufferWithTimeout(ch, 1, 30*time.Millisecond)
is.Equal([]int{2}, items3)
is.Equal(1, length3)
is.True(ok3)

items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond)
items4, length4, _, ok4 := BufferWithTimeout(ch, 2, 25*time.Millisecond)
is.Equal([]int{3, 4}, items4)
is.Equal(2, length4)
is.True(ok4)

items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond)
items5, length5, _, ok5 := BufferWithTimeout(ch, 3, 25*time.Millisecond)
is.Equal([]int{}, items5)
is.Equal(0, length5)
is.False(ok5)
Expand Down

0 comments on commit a133373

Please sign in to comment.