-
Notifications
You must be signed in to change notification settings - Fork 568
/
from_foreach.go
55 lines (50 loc) · 1.19 KB
/
from_foreach.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package stream
import (
"context"
"github.com/pachyderm/pachyderm/v2/src/internal/errutil"
)
// fromForEach provides functionality for generic imperative iteration.
// TODO: Move file set merge and datum merge to this abstraction.
type forEach[T any] struct {
dataChan chan T
errChan chan error
copy func(dst, src *T)
}
// NewFromForEach creates a new iterator from a forEachFunc
// Don't write new code that needs this.
func NewFromForEach[T any](ctx context.Context, cp func(dst, src *T), forEachFunc func(func(T) error) error) Iterator[T] {
dataChan := make(chan T)
errChan := make(chan error, 1)
go func() {
if err := forEachFunc(func(data T) error {
select {
case dataChan <- data:
return nil
case <-ctx.Done():
return errutil.ErrBreak
}
}); err != nil {
errChan <- err
return
}
close(dataChan)
}()
return &forEach[T]{
dataChan: dataChan,
errChan: errChan,
copy: cp,
}
}
// Next returns the next item and progresses the iterator.
func (i *forEach[T]) Next(ctx context.Context, dst *T) error {
select {
case data, more := <-i.dataChan:
if !more {
return EOS()
}
i.copy(dst, &data)
return nil
case err := <-i.errChan:
return err
}
}