Skip to content

Commit

Permalink
feat(streams.go): add Take function to limit the number of elements r…
Browse files Browse the repository at this point in the history
…eceived from a channel
  • Loading branch information
bounoable committed Aug 25, 2023
1 parent 9005f1a commit a6002ff
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions helper/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streams

import (
"context"
"errors"
"sync"
)

Expand All @@ -26,11 +27,11 @@ func New[T any](in []T) <-chan T {
// the buffer of the channel is set to the number of values and the values are
// pushed into the channel before returning.
//
// str, push, close := NewConcurrent(1, 2, 3)
// push(context.TODO(), 4, 5, 6)
// vals, err := All(str)
// // handle err
// // vals == []int{1, 2, 3, 4, 5, 6}
// str, push, close := NewConcurrent(1, 2, 3)
// push(context.TODO(), 4, 5, 6)
// vals, err := All(str)
// // handle err
// // vals == []int{1, 2, 3, 4, 5, 6}
//
// Use the Concurrent function to create a `push` function for an existing channel.
func NewConcurrent[T any](vals ...T) (_ <-chan T, _push func(context.Context, ...T) error, _close func()) {
Expand Down Expand Up @@ -104,6 +105,26 @@ func All[T any](in <-chan T, errs ...<-chan error) ([]T, error) {
return Drain(context.Background(), in, errs...)
}

var errTakeDone = errors.New("take done")

// Take receives elements from the input channel until it has received n
// elements or the input channel is closed. It returns a slice containing the
// received elements. If any error occurs during the process, it is returned as
// the second return value.
func Take[T any](ctx context.Context, n int, in <-chan T, errs ...<-chan error) ([]T, error) {
out := make([]T, 0, n)
if err := Walk(ctx, func(v T) error {
out = append(out, v)
if len(out) >= n {
return errTakeDone
}
return nil
}, in, errs...); err != nil && !errors.Is(err, errTakeDone) {
return out, err
}
return out, nil
}

// Walk receives from the given channel until it and and all provided error
// channels are closed, ctx is closed or any of the provided error channels
// receives an error. For every element e that is received from the input
Expand Down

0 comments on commit a6002ff

Please sign in to comment.