/
chunks.go
55 lines (51 loc) · 1.48 KB
/
chunks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package stream
import (
"github.com/primetalk/goio/fun"
"github.com/primetalk/goio/io"
)
// ChunkN groups elements by n and produces a stream of slices.
func ChunkN[A any](n int) func(sa Stream[A]) Stream[[]A] {
return ToChunks[A](n)
}
// ToChunks collects incoming elements in chunks of the given size.
func ToChunks[A any](size int) func(stm Stream[A]) Stream[[]A] {
return func(stm Stream[A]) Stream[[]A] {
return StateFlatMapWithFinish(stm, make([]A, 0, size),
func(a A, as []A) io.IO[fun.Pair[[]A, Stream[[]A]]] {
return io.Pure(func() fun.Pair[[]A, Stream[[]A]] {
as2 := append(as, a)
if len(as2) >= size {
return fun.NewPair(make([]A, 0, size), Lift(as2))
} else {
return fun.NewPair(as2, Empty[[]A]())
}
})
},
func(as []A) Stream[[]A] {
return Lift(as)
},
)
}
}
// ChunksResize rebuffers chunks to the given size.
func ChunksResize[A any](newSize int) func(stm Stream[[]A]) Stream[[]A] {
return func(stm Stream[[]A]) Stream[[]A] {
return StateFlatMapWithFinish(stm, []A{},
func(as1 []A, st []A) io.IO[fun.Pair[[]A, Stream[[]A]]] {
return io.Pure(func() fun.Pair[[]A, Stream[[]A]] {
st2 := append(st, as1...)
cnt := len(st2) / newSize
chunks := [][]A{}
for i := 0; i < cnt; i++ {
chunks = append(chunks, st2[i*newSize:(i+1)*newSize])
}
last := st2[cnt*newSize:]
return fun.NewPair(last, LiftMany(chunks...))
})
},
func(st []A) Stream[[]A] {
return Lift(st)
},
)
}
}