Skip to content

Commit

Permalink
Proposal channel iteration until err
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
  • Loading branch information
ajnavarro committed Sep 16, 2022
1 parent 0c84fe9 commit b95a96a
Showing 1 changed file with 16 additions and 28 deletions.
44 changes: 16 additions & 28 deletions compsequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,14 @@ func (r *composableSequential) Ready() bool {
// If count is set, the channel will return up to count results, stopping routers iteration.
func (r *composableSequential) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
var totalCount int64
ch, _ := getChannelOrErrorSequential(ctx, r.routers,
return getChannelOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) {
return r.FindProvidersAsync(ctx, cid, count), nil
},
func() bool {
return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0
},
)

return ch
}

// FindPeer calls FindPeer per each router sequentially.
Expand Down Expand Up @@ -116,13 +114,15 @@ func (r *composableSequential) GetValue(ctx context.Context, key string, opts ..
// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *composableSequential) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
return getChannelOrErrorSequential(ctx, r.routers,
ch := getChannelOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (<-chan []byte, error) {
return r.SearchValue(ctx, key, opts...)
},
func() bool { return false },
)

return ch, nil

}

// If some router fails and the IgnoreError flag is true, we continue to the next router.
Expand Down Expand Up @@ -184,50 +184,38 @@ func getChannelOrErrorSequential[T any](
routers []*SequentialRouter,
f func(context.Context, routing.Routing) (<-chan T, error),
shouldStop func() bool,
) (chan T, error) {
) chan T {
chanOut := make(chan T)
var chans []<-chan T
var cancels []context.CancelFunc

for _, router := range routers {
ctx, cancel := context.WithTimeout(ctx, router.Timeout)
rch, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
return nil, err
}

cancels = append(cancels, cancel)
chans = append(chans, rch)
}

go func() {
for i := 0; i < len(chans); i++ {
if chans[i] == nil {
cancels[i]()
continue
for _, router := range routers {
ctx, cancel := context.WithTimeout(ctx, router.Timeout)
rch, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
break
}

f:
for {
select {
case <-ctx.Done():
break f
case v, ok := <-chans[i]:
case v, ok := <-rch:
if !ok {
break f
}
chanOut <- v
}
}

cancels[i]()
cancel()
}

close(chanOut)
}()

return chanOut, nil
return chanOut
}

0 comments on commit b95a96a

Please sign in to comment.