forked from grab/async
/
consume_test.go
110 lines (100 loc) · 2.5 KB
/
consume_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2019 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file
package async
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestProcessTaskPool_HappyPath(t *testing.T) {
tests := []struct {
desc string
taskCount int
concurrency int
}{
{
desc: "10 tasks in channel to be run with default concurrency",
taskCount: 10,
concurrency: 0,
},
{
desc: "10 tasks in channel to be run with 2 workers",
taskCount: 10,
concurrency: 2,
},
{
desc: "10 tasks in channel to be run with 10 workers",
taskCount: 10,
concurrency: 10,
},
{
desc: "10 tasks in channel to be run with 20 workers",
taskCount: 10,
concurrency: 20,
},
}
for _, test := range tests {
m := test
resChan := make(chan struct{}, m.taskCount)
taskChan := make(chan Task)
go func() {
for i := 0; i < m.taskCount; i++ {
taskChan <- NewTask(func(context.Context) (any, error) {
resChan <- struct{}{}
time.Sleep(time.Millisecond * 10)
return nil, nil
})
}
close(taskChan)
}()
p := Consume(context.Background(), m.concurrency, taskChan)
_, err := p.Outcome()
close(resChan)
assert.Nil(t, err, m.desc)
var res []struct{}
for r := range resChan {
res = append(res, r)
}
assert.Len(t, res, m.taskCount, m.desc)
}
}
// test context cancellation
func TestProcessTaskPool_SadPath(t *testing.T) {
tests := []struct {
desc string
taskCount int
concurrency int
timeOut time.Duration // in millisecond
}{
{
desc: "2 workers cannot finish 10 tasks in 20 ms where 1 task takes 10 ms. Context cancelled while waiting for available worker",
taskCount: 10,
concurrency: 2,
timeOut: 20,
},
{
desc: "once 10 tasks are completed, workers will wait for more task. Then context will timeout in 20ms",
taskCount: 10,
concurrency: 20,
timeOut: 20,
},
}
for _, test := range tests {
m := test
taskChan := make(chan Task)
ctx, cancel := context.WithTimeout(context.Background(), m.timeOut*time.Millisecond)
defer cancel()
go func() {
for i := 0; i < m.taskCount; i++ {
taskChan <- NewTask(func(context.Context) (any, error) {
time.Sleep(time.Millisecond * 10)
return nil, nil
})
}
}()
p := Consume(ctx, m.concurrency, taskChan)
_, err := p.Outcome()
assert.NotNil(t, err, m.desc)
}
}