-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_test.go
123 lines (106 loc) · 2.7 KB
/
server_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package delay_queue
import (
"context"
"fmt"
"testing"
"time"
)
func TestServerStart(t *testing.T) {
NewServer().AddTasks([]Task{
{
Name: "delay-queue-1",
DelayTime: 10,
Limit: 50,
Consumer: DelayQueue1Consumer{},
ConsumerNum: 1,
Redis: defaultRedis(),
AckType: AckTypeAuto,
AckTimeout: 30,
},
{
Name: "delay-queue-2",
DelayTime: 10,
Limit: 2,
Consumer: DelayQueue2Consumer{},
ConsumerNum: 3,
Redis: defaultRedis(),
AckType: AckTypeAuto,
AckTimeout: 3,
},
}).Start()
time.Sleep(10000 * time.Second)
}
func TestAckTypeManual(t *testing.T) {
NewServer().AddTasks([]Task{
{
Name: "delay-queue-3",
DelayTime: 10,
Limit: 2,
Consumer: DelayQueue3Consumer{},
ConsumerNum: 3,
Redis: defaultRedis(),
AckType: AckTypeManual,
AckTimeout: 30,
},
}).Start()
time.Sleep(10000 * time.Second)
}
func TestAckTypeDisable(t *testing.T) {
NewServer().AddTasks([]Task{
{
Name: "delay-queue-4",
DelayTime: 10,
Limit: 2,
Consumer: DelayQueue4Consumer{},
ConsumerNum: 3,
Redis: defaultRedis(),
AckType: AckTypeDisable,
AckTimeout: 30,
},
}).Start()
time.Sleep(10000 * time.Second)
}
type DelayQueue1Consumer struct {
}
func (d DelayQueue1Consumer) Deal(ctx context.Context, task Task, messages []string) error {
fmt.Println("DelayQueue1Consumer Deal", messages)
return nil
}
func (d DelayQueue1Consumer) Error(ctx context.Context, task Task, err *Error) {
fmt.Println("DelayQueue1Consumer Error", *err)
}
type DelayQueue2Consumer struct {
}
func (d DelayQueue2Consumer) Deal(ctx context.Context, task Task, messages []string) error {
fmt.Println("DelayQueue2Consumer Deal", messages)
return nil
}
func (d DelayQueue2Consumer) Error(ctx context.Context, task Task, err *Error) {
fmt.Println("DelayQueue2Consumer Error", *err)
}
type DelayQueue3Consumer struct {
}
func (d DelayQueue3Consumer) Deal(ctx context.Context, task Task, messages []string) error {
fmt.Println("DelayQueue3Consumer Deal", messages)
err := task.Ack(ctx, messages...)
fmt.Println(err)
return nil
}
func (d DelayQueue3Consumer) Error(ctx context.Context, task Task, err *Error) {
fmt.Println("DelayQueue3Consumer Error", *err)
}
type DelayQueue4Consumer struct {
}
func (d DelayQueue4Consumer) Deal(ctx context.Context, task Task, messages []string) error {
fmt.Println("DelayQueue4Consumer Deal", messages)
return nil
}
func (d DelayQueue4Consumer) Error(ctx context.Context, task Task, err *Error) {
fmt.Println("DelayQueue4Consumer Error", *err)
}
func defaultRedis() DefaultRedis {
return DefaultRedis{
Host: "0.0.0.0",
Port: 6379,
}
}