Lightweight and super fast functional stream processing library inspired by mariomac/gostream and RxGo.
- Go 1.21 or higher
This library makes intensive usage of Type Parameters (generics) and some new features of Go 1.21 so it is not compatible with any Go version lower than 1.21.
For more details about the API, please check the stream/*_test.go
or examples/*.go
files.
func main() {
stream.Of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).
Filter(isPrime).
Map(square).
ForEach(func(n int) {
fmt.Printf("%d is a square of a prime number\n", n)
})
}
func isPrime(n int) bool {
for i := 2; i < n/2; i++ {
if n%i == 0 {
return false
}
}
return true
}
func square(n int) int {
return n * n
}
Output:
1 is a square of a prime number
4 is a square of a prime number
9 is a square of a prime number
16 is a square of a prime number
25 is a square of a prime number
49 is a square of a prime number
121 is a square of a prime number
- Creates an infinite stream of random integers (no problem, streams are evaluated lazily!)
- In this case, if you want to turn the stream into a slice, you need to limit the number of elements (the stream can be infinite, but slice can't).
func main() {
r := rand.New(rand.NewSource(time.Now().UnixMilli()))
fmt.Println("let me throw 5 times a dice for you")
// Generate an infinite stream of random numbers
// The generator function of the parameter returns a pair (value, hasMore)
results := stream.Generate(func() (int, bool) {
return r.Int(), true
}).
Map(func(n int) int {
return n%6 + 1
}).
Limit(5).
ToSlice()
fmt.Printf("results: %v\n", results)
}
Output:
let me throw 5 times a dice for you
results: [3 5 2 1 3]
- Generates an stream from in a range of numbers, and then map the number to its
doubled
value. - Then
Map
the numbers' stream to a strings' stream. Because, at the moment, go does not allow type parameters in methods, we need to invoke thestream.Map
function instead of thenumbers.Map
method because the contained type of the output stream (string
) is different than the type of the input stream (int
). - Converts the words stream to a slice and prints it.
func main() {
numbers := stream.Range(1, 7).Map(func(n int) int {
return double(n)
})
words := stream.Map(numbers, asWord).ToSlice()
fmt.Println(words)
}
func double(n int) int {
return 2 * n
}
func asWord(n int) string {
if n < 10 {
return []string{"zero", "one", "two", "three", "four", "five",
"six", "seven", "eight", "nine"}[n]
} else {
return "many"
}
}
Output:
[one two four eight many many]
Following example requires to compare the elements of the Stream, so the Stream needs to be
composed by comparable
elements (this is, accepted by the the ==
and !=
operators):
- Instantiate a
Stream
ofcomparable
items. - Pass it to the
Distinct
method, that will return a copy of the original Stream without duplicates - Operating as any other stream.
words := stream.Distinct(
stream.Of("hello", "hello", "!", "ho", "ho", "ho", "!"),
).ToSlice()
fmt.Printf("Deduplicated words: %v\n", words)
Output:
Deduplicated words: [hello ! ho]
- Generate a stream of uint32 numbers.
- Picking up 5 elements.
- Sorting them by the inverse natural order (from higher to lower)
- It's important to limit the number of elements, avoiding invoking
Sorted
over an infinite stream (otherwise it would panic).
- It's important to limit the number of elements, avoiding invoking
cmp := func(a, b uint32) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
}
func main() {
r := rand.New(rand.NewSource(time.Now().UnixMilli()))
fmt.Println("picking up 5 random numbers from lower to higher")
stream.Generate(func() (uint32, bool) {
return r.Uint32(), true
}).
Limit(5).
Sorted(cmp).
ForEach(func(n uint32) {
fmt.Println(n)
})
}
Output:
picking up 5 random numbers from lower to higher
2324462508
2605992364
2733296373
3263780948
4197612992
- Generate an infinite incremental Stream (1, 2, 3, 4...) using the
stream.Iterate
instantiator and theitem.Increment
helper function. - Limit the generated to 8 elements
- Reduce all the elements multiplying them using the item.Multiply helper function
fac8, _ := stream.Range(1, 9).
Reduce(item.Multiply[int])
fmt.Println("The factorial of 8 is", fac8)
Output:
The factorial of 8 is 40320
func isPrime(n int) bool {
for i := 2; i < n/2; i++ {
if n%i == 0 {
return false
}
}
return true
}
func main() {
// we do some complex and nonsense calculation in parallel
result := stream.Range(1, 1000).
Parallel(4). // parallelize the process and use 4 goroutines, then the following operations will be executed in parallel (if possible)
Filter(isPrime).
Map(func(n int) int {
// simulate a complex calculation
time.Sleep(100 * time.Nanosecond)
return (n + 1) / 2
}).
Reduce(0, func(a, b int) int {
// simulate a complex calculation
time.Sleep(100 * time.Nanosecond)
return a + b
})
fmt.Println("The nonsense result is", result)
}
Output:
The nonsense result is 38150
Due to the initial limitations of Go generics, the API has the following limitations. We will work on overcome them as long as new features are added to the Go type parameters specification.
- You can use
Map
andFlatMap
as method as long as the output element has the same type of the input. If you need to map to a different type, you need to usestream.Map
orstream.FlatMap
as functions. - There is no
Distinct
method. There is only astream.Distinct
function.
For small streams, the performance of this library is comparable to the performance of go-stream, but for large streams, the performance of this library is much better.
If you enable parallelism, the performance of this library is even better.
- Stream instantiation functions
- Comparable
- Concat
- Generate
- Of
- OfSlice
- OfChannel
- Stream transformers
- Distinct
- Filter
- FlatMap
- Limit
- Map
- Peek
- Skip
- Sorted
- GroupBy
- Defer
- Fork
- enable user to early terminate the heavy operations
- Collectors/Terminals
- ToSlice
- AllMatch
- AnyMatch
- Count
- ForEach
- Max
- Min
- NoneMatch
- Reduce
- ReduceSequentially
The Stream processing and aggregation functions and docs are heavily inspired by the mariomac/gostream.