Skip to content

Commit

Permalink
feat: zero timeout on composed routers should disable timeout
Browse files Browse the repository at this point in the history
This will let consumers disable timeouts instead of using a timeout of
0s which isn't otherwise useful since it will always fail anyway.
  • Loading branch information
guseggert committed Mar 27, 2023
1 parent 8adffb3 commit 136d254
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 90 deletions.
17 changes: 14 additions & 3 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ func getValueOrErrorParallel[T any](
select {
case <-ctx.Done():
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
ctx, cancel := context.WithCancel(ctx)
if r.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, r.Timeout)
}
defer cancel()
value, empty, err := f(ctx, r.Router)
if err != nil &&
Expand Down Expand Up @@ -269,8 +272,12 @@ func executeParallel(
errCh <- ctx.Err()
}
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
ctx, cancel := context.WithCancel(ctx)
if r.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, r.Timeout)
}
defer cancel()

log.Debug("executeParallel: calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
Expand Down Expand Up @@ -335,8 +342,12 @@ func getChannelOrErrorParallel[T any](
)
return
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
ctx, cancel := context.WithCancel(ctx)
if r.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, r.Timeout)
}
defer cancel()

log.Debug("getChannelOrErrorParallel: calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
Expand Down
22 changes: 22 additions & 0 deletions compparallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,28 @@ func TestComposableParallelFixtures(t *testing.T) {
}},
SearchValue: []searchValueFixture{{key: "a", ctx: canceledCtx, err: context.Canceled}},
},
{
Name: "timeout=0 should disable the timeout, two routers with one disabled timeout should timeout on the other router",
routers: []*ParallelRouter{
{
Timeout: 0,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, nil, nil),
},
},
{
Timeout: time.Second,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, []string{"a"}, []string{"av"}),
},
},
},
GetValue: []getValueFixture{
{key: "/wait/100ms/a", value: "av", searchValCount: 1},
},
},
}

for _, f := range fixtures {
Expand Down
26 changes: 17 additions & 9 deletions compsequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ func getValueOrErrorSequential[T any](
return value, ctxErr
}

ctx, cancel := context.WithTimeout(ctx, router.Timeout)
ctx, cancel := context.WithCancel(ctx)
if router.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, router.Timeout)
}
defer cancel()

value, empty, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
Expand All @@ -184,14 +188,18 @@ func executeSequential(
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
ctx, cancel := context.WithTimeout(ctx, router.Timeout)

ctx, cancel := context.WithCancel(ctx)
if router.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, router.Timeout)
}
defer cancel()

if err := f(ctx, router.Router); err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
return err
}
cancel()
}

return nil
Expand All @@ -211,13 +219,15 @@ func getChannelOrErrorSequential[T any](
close(chanOut)
return
}

ctx, cancel := context.WithTimeout(ctx, router.Timeout)
ctx, cancel := context.WithCancel(ctx)
if router.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, router.Timeout)
}
defer cancel()
rch, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
break
}

Expand All @@ -238,8 +248,6 @@ func getChannelOrErrorSequential[T any](

}
}

cancel()
}

close(chanOut)
Expand Down
132 changes: 54 additions & 78 deletions compsequential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,31 @@ func TestNoResultsSequential(t *testing.T) {
}

func TestComposableSequentialFixtures(t *testing.T) {
type getValueFixture struct {
err error
key string
value string
searchValCount int
}
type putValueFixture struct {
err error
key string
value string
}
type provideFixture struct {
err error
}
type findPeerFixture struct {
peerID string
err error
}
fixtures := []struct {
Name string
routers []*SequentialRouter
GetValueFixtures []struct {
err error
key string
value string
searchValCount int
}
PutValueFixtures []struct {
err error
key string
value string
}
ProvideFixtures []struct {
err error
}
FindPeerFixtures []struct {
peerID string
err error
}
GetValueFixtures []getValueFixture
PutValueFixtures []putValueFixture
ProvideFixtures []provideFixture
FindPeerFixtures []findPeerFixture
}{
{
Name: "simple two routers",
Expand All @@ -85,12 +89,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -102,11 +101,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
searchValCount: 2,
},
},
PutValueFixtures: []struct {
err error
key string
value string
}{
PutValueFixtures: []putValueFixture{
{
err: errors.New("a"),
key: "/error/a",
Expand All @@ -117,17 +112,12 @@ func TestComposableSequentialFixtures(t *testing.T) {
value: "a",
},
},
ProvideFixtures: []struct {
err error
}{
ProvideFixtures: []provideFixture{
{
err: routing.ErrNotSupported,
},
},
FindPeerFixtures: []struct {
peerID string
err error
}{
FindPeerFixtures: []findPeerFixture{
{
peerID: "pid1",
},
Expand Down Expand Up @@ -158,12 +148,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -174,11 +159,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
key: "a",
},
},
PutValueFixtures: []struct {
err error
key string
value string
}{
PutValueFixtures: []putValueFixture{
{
key: "/error/x",
value: "xv",
Expand All @@ -188,10 +169,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
value: "yv",
},
},
FindPeerFixtures: []struct {
peerID string
err error
}{
FindPeerFixtures: []findPeerFixture{
{
peerID: "pid1",
},
Expand Down Expand Up @@ -223,12 +201,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -248,11 +221,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
key: "/error/y",
},
},
PutValueFixtures: []struct {
err error
key string
value string
}{
PutValueFixtures: []putValueFixture{
{
key: "/error/x",
value: "xv",
Expand All @@ -262,10 +231,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
value: "yv",
},
},
FindPeerFixtures: []struct {
peerID string
err error
}{
FindPeerFixtures: []findPeerFixture{
{
peerID: "pid1",
},
Expand Down Expand Up @@ -297,12 +263,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
err: errFailValue,
key: "d",
Expand Down Expand Up @@ -337,12 +298,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -355,6 +311,26 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
{
Name: "timeout=0 should disable the timeout, two routers with one disabled timeout should timeout on the other router",
routers: []*SequentialRouter{
{
Timeout: 0,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, nil, nil),
},
},
{
Timeout: time.Minute,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, []string{"a"}, []string{"av"}),
},
},
},
GetValueFixtures: []getValueFixture{{key: "/wait/100ms/a", value: "av", searchValCount: 1}},
},
}

for _, f := range fixtures {
Expand Down
Loading

0 comments on commit 136d254

Please sign in to comment.