Skip to content

Commit

Permalink
Merge pull request #11 from koss-null/1.0.3
Browse files Browse the repository at this point in the history
1.0.3
  • Loading branch information
koss-null committed Aug 1, 2023
2 parents 1aafde3 + f0d4ea2 commit 6d9dc33
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 116 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ example:
go run ./example/main.go

test:
go test -coverprofile=coverage.out --parallel 8 -v ./...
go test -race --parallel 8 -v ./...

get_coverage_pic:
gopherbadger -md="README.md,coverage.out"
45 changes: 26 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

[![Go Report Card](https://goreportcard.com/badge/github.com/koss-null/funcfrog)](https://goreportcard.com/report/github.com/koss-null/lambda)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Coverage](https://raw.githubusercontent.com/koss-null/funcfrog/0.9.0/coverage_badge.png?raw=true)](coverage)
[![Coverage](https://raw.githubusercontent.com/koss-null/funcfrog/master/coverage_badge.png?raw=true)](coverage)

![FuncFrog icon](https://github.com/koss-null/funcfrog/blob/0.9.0/FuncFrogIco.jpg?raw=true)
![FuncFrog icon](https://github.com/koss-null/funcfrog/blob/master/FuncFrogIco.jpg?raw=true)

FuncFrog is a library for performing parallel, lazy `map`, `reduce`, and `filter` operations on slices in one pipeline. The slice can be set by a generating function, and parallel execution is supported. It is expected that all function arguments will be **pure functions** (functions with no side effects that can be cached by their arguments).
It is capable of handling large amounts of data with minimal overhead, and its parallel execution capabilities allow for even faster processing times. Additionally, the library is easy to use and has a clean, intuitive API. [Here](https://macias.info/entry/202212020000_go_streams.md) is some performance review.
Expand Down Expand Up @@ -93,9 +93,9 @@ available.
The following functions can be used to create a new `Pipe` (this is how I call the inner representation of a sequence of
elements and a sequence operations on them):
- :frog: `Slice([]T) Piper`: creates a `Pipe` of a given type `T` from a slice, *the length is known*.
- :frog: `Func(func(i int) (T, bool)) PiperNL`: creates a `Pipe` of type `T` from a function. The function returns an element wich is considered to be at `i`th position in the `Pipe`, as well as a boolean indicating whether the element should be included (`true`) or skipped (`false`), *the length is unknown*.
- :frog: `Func(func(i int) (T, bool)) PiperNL`: creates a `Pipe` of type `T` from a function. The function returns an element which is considered to be at `i`th position in the `Pipe`, as well as a boolean indicating whether the element should be included (`true`) or skipped (`false`), *the length is unknown*.
- :frog: `Fn(func(i int) (T)) *Pipe`: creates a `Pipe` of type `T` from a function. The function should return the value of the element at the `i`th position in the `Pipe`; to be able to skip values use `Func`.
- :frog: `FuncP(func(i int) (*T, bool)) PiperNL`: creates a `Pipe` of type `T` from a function. The function returns a pointer to an element wich is considered to be at `i`th position in the `Pipe`, as well as a boolean indicating whether the element should be included (`true`) or skipped (`false`), *the length is unknown*.
- :frog: `FuncP(func(i int) (*T, bool)) PiperNL`: creates a `Pipe` of type `T` from a function. The function returns a pointer to an element which is considered to be at `i`th position in the `Pipe`, as well as a boolean indicating whether the element should be included (`true`) or skipped (`false`), *the length is unknown*.
- :frog: `Cycle(data []T) PiperNL`: creates a new `Pipe` that cycles through the elements of the provided slice indefinitely. *The length is unknown.*
- :frog: `Range(start, end, step T) Piper`: creates a new `Pipe` that generates a sequence of values of type `T` from `start` to `end` (exclusive) with a fixed `step` value between each element. `T` can be any numeric type, such as `int`, `float32`, or `float64`. *The length is known.*
- :frog: `Take(n int) Piper`: if it's a `Func`-made `Pipe`, expects `n` values to be eventually returned. *Transforms
Expand Down Expand Up @@ -203,37 +203,44 @@ result := pipe.Reduce(strP, func(x, y *string) int { return len(*x) + len(*y) })

```go
p := pipe.Func(func(i int) (float32, bool) {
return float32(i) * 0.9, true
return 100500-float32(i) * 0.9, true
}).
Map(func(x float32) float32 { return x * x }).
Map(func(x float32) float32 { return x * x * 0.1 }).
Gen(100500). // Sort is only availavle on pipes with known length
Sort(pipe.Less[float32]). // pipe.Less(x, y *T) bool is available to all comparables
Sort(pipies.Less[float32]). // pipies.Less(x, y *T) bool is available to all comparables
// check out pipies package to find more usefull things
Parallel(12).
Do()
// p will contain the elements sorted in ascending order
```

### Example of infine sequence generation:

Here is an example of generating an infinite sequence of Fibonacci greater than 1000:
Here is an example of generating an infinite sequence of Fibonacci:

```go
prev := 0
var fib []chan int
p := pipe.Func(func(i int) (int, bool) {
return i, true
}).
Map(func(x int) int {
prev = x+prev
return prev
}).
Filter(func(x *int) bool { return *x > 1000 })
if i < 2 {
fib[i] <- i
return i, true
}
p1 := <-fib[i-1]; fib[i-1] <- p1
p2 := <-fib[i-2]; fib[i-2] <- p2

fib[i] <- p1 + p2
return p1 + p2, true
}).Parallel(20)
```
But be careful if you are going to do it in parallel, you never know in with order Map funcs will be called.

To generate a specific number of values, you can use the `Take` method:
To generate a specific number of values, you can use the `Take` or `Gen` method:

```go
p = p.Take(65000)
// fill the array first
fib = make([]chan int, 60)
for i := range fib { fib[i] = make(chan int, 1) }
// do the Take
p = p.Take(60)
```

To accumulate the elements of the `Pipe`, you can use the `Reduce` or `Sum` method:
Expand Down
Binary file modified coverage_badge.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion internal/algo/parallel/mergesort/mergesort.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func mergeSplits[T any](
break
}

// this one controls amount of simultanious merge processes running
// this one controls amount of simultaneous merge processes running
<-jobTicket
wg.Add(1)
go func(i int) {
Expand Down
25 changes: 16 additions & 9 deletions internal/internalpipe/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,25 @@ func (p Pipe[T]) Any() *T {
step = max(divUp(limit, p.GoroutinesCnt), 1)
}
var (
res = make(chan *T, 1)
res = make(chan *T)
// if p.len is not set, we need tickets to control the amount of goroutines
tickets = genTickets(p.GoroutinesCnt)

done = make(chan struct{})
wg sync.WaitGroup
done bool
)
if !lenSet {
step = infiniteLenStep
}

setObj := func(obj *T) {
if done {
select {
case <-done:
return
default:
close(done)
res <- obj
}
done = true
res <- obj
}

go func() {
Expand All @@ -63,11 +65,16 @@ func (p Pipe[T]) Any() *T {
if lenSet {
rg = min(rg, limit)
}
for j := lf; j < rg && !done; j++ {
obj, skipped := p.Fn(j)
if !skipped {
setObj(obj)
for j := lf; j < rg; j++ {
select {
case <-done:
return
default:
obj, skipped := p.Fn(j)
if !skipped {
setObj(obj)
return
}
}
}
}(i, i+step)
Expand Down
173 changes: 99 additions & 74 deletions internal/internalpipe/first.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package internalpipe

import (
"context"
"math"
"sync"
)

func (p Pipe[T]) First() *T {
limit := p.limit()
if p.GoroutinesCnt == 1 {
return firstSingleThread(limit, p.Fn)
}
return first(limit, p.GoroutinesCnt, p.Fn)
}

func firstSingleThread[T any](limit int, fn func(i int) (*T, bool)) *T {
var obj *T
var skipped bool
Expand All @@ -17,92 +26,108 @@ func firstSingleThread[T any](limit int, fn func(i int) (*T, bool)) *T {
return nil
}

// First returns the first element of the pipe.
func (p Pipe[T]) First() *T {
limit := p.limit()
if p.GoroutinesCnt == 1 {
return firstSingleThread(limit, p.Fn)
type firstResult[T any] struct {
val *T
step int
totalSteps int
zeroStepBorder int
mx *sync.Mutex
ctx context.Context
cancel func()
done map[int]struct{}

resForSure chan *T
}

func newFirstResult[T any](totalSteps int) *firstResult[T] {
ctx, cancel := context.WithCancel(context.Background())
return &firstResult[T]{
step: math.MaxInt,
totalSteps: totalSteps,
mx: &sync.Mutex{},
ctx: ctx,
cancel: cancel,
done: make(map[int]struct{}, totalSteps),
resForSure: make(chan *T),
}
}

var (
step = max(divUp(limit, p.GoroutinesCnt), 1)
tickets = genTickets(p.GoroutinesCnt)
func (r *firstResult[T]) setVal(val *T, step int) {
r.mx.Lock()
defer r.mx.Unlock()

resStorage = struct {
val *T
pos int
}{nil, math.MaxInt}
resStorageMx sync.Mutex
res = make(chan *T, 1)
if step == r.zeroStepBorder {
r.resForSure <- val
r.cancel()
return
}
if step < r.step {
r.val = val
r.step = step
}
}

wg sync.WaitGroup
func (r *firstResult[T]) stepDone(step int) {
r.mx.Lock()
defer r.mx.Unlock()

stepCnt int
zeroStep int
)
r.done[step] = struct{}{}

updStorage := func(val *T, pos int) {
resStorageMx.Lock()
if pos < resStorage.pos {
resStorage.pos = pos
resStorage.val = val
// need to move r.zeroStepBorder up
if step == r.zeroStepBorder {
ok := true
for ok {
r.zeroStepBorder++
if r.zeroStepBorder == r.step {
r.resForSure <- r.val
r.cancel()
return
}

_, ok = r.done[r.zeroStepBorder]
}
resStorageMx.Unlock()
}

// this wg.Add is to make wg.Wait() wait if for loops that have not start yet
wg.Add(1)
go func() {
var done bool
// i >= 0 is for an int owerflow case
for i := 0; i >= 0 && i < limit && !done; i += step {
wg.Add(1)
<-tickets
go func(lf, rg, stepCnt int) {
defer func() {
tickets <- struct{}{}
wg.Done()
}()

rg = max(rg, limit)
for j := lf; j < rg; j++ {
obj, skipped := p.Fn(j)
if !skipped {
if stepCnt == zeroStep {
res <- obj
done = true
return
}
updStorage(obj, stepCnt)
return
}
if r.zeroStepBorder >= r.totalSteps {
r.resForSure <- nil
r.cancel()
}
}

if resStorage.pos < stepCnt {
done = true
return
}
}
// no lock needed since it's changed only in one goroutine
if stepCnt == zeroStep {
zeroStep++
if resStorage.pos == zeroStep {
res <- resStorage.val
done = true
func first[T any](limit, grtCnt int, fn func(i int) (*T, bool)) (f *T) {
if limit == 0 {
return
}
step := max(divUp(limit, grtCnt), 1)
tickets := genTickets(grtCnt)

res := newFirstResult[T](grtCnt)

stepCnt := 0
for i := 0; i >= 0 && i < limit; i += step {
<-tickets
go func(lf, rg, stepCnt int) {
defer func() {
tickets <- struct{}{}
}()

done := res.ctx.Done()
for j := lf; j < rg; j++ {
select {
case <-done:
return
default:
val, skipped := fn(j)
if !skipped {
res.setVal(val, stepCnt)
return
}
}
}(i, i+step, stepCnt)
stepCnt++
}
wg.Done()
}()

go func() {
wg.Wait()
resStorageMx.Lock()
defer resStorageMx.Unlock()
res <- resStorage.val
}()
}
res.stepDone(stepCnt)
}(i, i+step, stepCnt)
stepCnt++
}

return <-res
return <-res.resForSure
}
2 changes: 2 additions & 0 deletions internal/internalpipe/first_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestFirst(t *testing.T) {
ValLim: -1,
GoroutinesCnt: 5,
}
require.NotNil(t, p.First())
require.Equal(t, 900_001, *p.First())
})
t.Run("1000 threads", func(t *testing.T) {
Expand All @@ -60,6 +61,7 @@ func TestFirst(t *testing.T) {
ValLim: -1,
GoroutinesCnt: 1000,
}
require.NotNil(t, p.First())
require.Equal(t, 900_001, *p.First())
})
t.Run("not found", func(t *testing.T) {
Expand Down
16 changes: 7 additions & 9 deletions internal/internalpipe/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ func (p Pipe[T]) Sort(less func(*T, *T) bool) Pipe[T] {

return Pipe[T]{
Fn: func(i int) (*T, bool) {
if sorted == nil {
once.Do(func() {
data := p.Do()
if len(data) == 0 {
return
}
sorted = qsort.Sort(data, less, p.GoroutinesCnt)
})
}
once.Do(func() {
data := p.Do()
if len(data) == 0 {
return
}
sorted = qsort.Sort(data, less, p.GoroutinesCnt)
})
if i >= len(sorted) {
return nil, true
}
Expand Down

0 comments on commit 6d9dc33

Please sign in to comment.