-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpause_test.go
101 lines (86 loc) · 2.27 KB
/
pause_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package workflow_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/luno/workflow"
"github.com/luno/workflow/adapters/memrecordstore"
"github.com/luno/workflow/adapters/memrolescheduler"
"github.com/luno/workflow/adapters/memstreamer"
)
func TestRetryOfPausedRecords(t *testing.T) {
b := workflow.NewBuilder[string, status]("example")
var errorCounter int
pauseAfterErrCount := 3
stopErroringAfter := 5
b.AddStep(
StatusStart,
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
if errorCounter == stopErroringAfter {
return StatusEnd, nil
}
errorCounter++
return 0, fmt.Errorf("test error")
},
StatusEnd,
).WithOptions(
workflow.PauseAfterErrCount(pauseAfterErrCount),
workflow.ErrBackOff(time.Millisecond),
)
w := b.Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
workflow.WithPauseRetry(time.Millisecond*10),
)
ctx := context.Background()
w.Run(ctx)
t.Cleanup(w.Stop)
fid := "12345"
_, err := w.Trigger(ctx, fid)
require.NoError(t, err)
workflow.Require(t, w, fid, StatusEnd, "")
require.Equal(t, 5, errorCounter)
}
func TestRetryOfPausedRecordsConfig(t *testing.T) {
newBuilder := func() *workflow.Builder[string, status] {
b := workflow.NewBuilder[string, status]("example")
b.AddStep(
StatusStart,
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
return 0, fmt.Errorf("test error")
},
StatusEnd,
).WithOptions()
return b
}
t.Run("Disabled - no retry process for paused records", func(t *testing.T) {
w := newBuilder().Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
workflow.DisablePauseRetry(),
)
ctx := context.Background()
w.Run(ctx)
t.Cleanup(w.Stop)
states := w.States()
_, processLaunched := states["paused-records-retry-consumer"]
require.False(t, processLaunched)
})
t.Run("Enabled by default should be launched", func(t *testing.T) {
w := newBuilder().Build(
memstreamer.New(),
memrecordstore.New(),
memrolescheduler.New(),
)
ctx := context.Background()
w.Run(ctx)
t.Cleanup(w.Stop)
states := w.States()
_, processLaunched := states["paused-records-retry-consumer"]
require.True(t, processLaunched)
})
}