-
Notifications
You must be signed in to change notification settings - Fork 0
/
machinery.go
170 lines (149 loc) · 5.21 KB
/
machinery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package queue
import (
"bytes"
"context"
"errors"
"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/backends/result"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
backendsiface "github.com/RichardKnop/machinery/v1/backends/iface"
brokersiface "github.com/RichardKnop/machinery/v1/brokers/iface"
)
const (
QueueRabbitmqExchangetypeDirect = "direct"
QueueRabbitmqExchangetypeFanout = "fanout"
QueueRabbitmqExchangetypeTopic = "topic"
)
// mockgen -destination=./machinery_mock.go -package=queue gitee.com/kelvins-io/common/queue TaskServerIface
type TaskServerIface interface {
NewWorker(consumerTag string, concurrency int) *machinery.Worker
NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *machinery.Worker
GetBroker() brokersiface.Broker
SetBroker(broker brokersiface.Broker)
GetBackend() backendsiface.Backend
SetBackend(backend backendsiface.Backend)
GetConfig() *config.Config
SetConfig(cnf *config.Config)
RegisterTasks(namedTaskFuncs map[string]interface{}) error
RegisterTask(name string, taskFunc interface{}) error
IsTaskRegistered(name string) bool
GetRegisteredTask(name string) (interface{}, error)
SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
SendTask(signature *tasks.Signature) (*result.AsyncResult, error)
SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)
SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
GetRegisteredTaskNames() []string
}
type MachineryQueue struct {
TaskServer TaskServerIface
}
func newMachineryQueue(broker string, defaultQueue string, resultBackend string, resultsExpireIn int, amqp *config.AMQPConfig, namedTaskFuncs map[string]interface{}) (*MachineryQueue, error) {
config2 := &config.Config{
Broker: broker,
DefaultQueue: defaultQueue,
ResultBackend: resultBackend,
ResultsExpireIn: resultsExpireIn,
AMQP: amqp,
NoUnixSignals: true,
}
server, err := machinery.NewServer(config2)
if err != nil {
return nil, err
}
err = server.RegisterTasks(namedTaskFuncs)
if err != nil {
return nil, err
}
return &MachineryQueue{TaskServer: server}, nil
}
func NewRabbitMqQueue(broker string, defaultQueue string, resultBackend string, resultsExpireIn int, exchange string, exchangeType string,
bindingKey string, prefetchCount int, queueBindingArgs map[string]interface{}, namedTaskFuncs map[string]interface{}) (*MachineryQueue, error) {
allowExchangeType := map[string]bool{
QueueRabbitmqExchangetypeDirect: true,
QueueRabbitmqExchangetypeFanout: true,
QueueRabbitmqExchangetypeTopic: true,
}
if _, ok := allowExchangeType[exchangeType]; !ok {
return nil, errors.New("error exchange type for rabbitMq")
}
amqp := &config.AMQPConfig{
Exchange: exchange,
ExchangeType: exchangeType,
BindingKey: bindingKey,
PrefetchCount: prefetchCount,
QueueBindingArgs: queueBindingArgs,
}
return newMachineryQueue(
broker,
defaultQueue,
resultBackend,
resultsExpireIn,
amqp,
namedTaskFuncs,
)
}
func NewRedisQueue(broker string, defaultQueue string, resultBackend string, resultsExpireIn int, namedTaskFuncs map[string]interface{}) (*MachineryQueue, error) {
return newMachineryQueue(
broker,
defaultQueue,
resultBackend,
resultsExpireIn,
nil,
namedTaskFuncs,
)
}
type AliAMQPConfig struct {
AccessKey string // 阿里云accesskey
SecretKey string // 阿里云secretKey
AliUid int // 阿里云资源owner账户 ID 信息,点击在控制台右上角客户头像进入账号管理查看
EndPoint string // 阿里云amqp接入点
VHost string // vhost
DefaultQueue string // 默认队列
ResultBackend string // 任务状态存储后台
ResultsExpireIn int // 任务状态存储时间
Exchange string
ExchangeType string
BindingKey string
PrefetchCount int
QueueBindingArgs map[string]interface{}
NamedTaskFuncs map[string]interface{}
}
// 阿里云AMQP消息队列
func NewAliAMQPMqQueue(c *AliAMQPConfig) (*MachineryQueue, error) {
var broker = buildAliAMQPBroker(
c.AccessKey,
c.SecretKey,
c.AliUid,
c.EndPoint,
c.VHost,
)
return NewRabbitMqQueue(
broker,
c.DefaultQueue,
c.ResultBackend,
c.ResultsExpireIn,
c.Exchange,
c.ExchangeType,
c.BindingKey,
c.PrefetchCount,
c.QueueBindingArgs,
c.NamedTaskFuncs,
)
}
func buildAliAMQPBroker(accessKey string, secretKey string, aliUid int, endPoint string, vHost string) string {
var bf bytes.Buffer
bf.WriteString("amqp://")
bf.WriteString(convertAliyunUserName(accessKey, aliUid))
bf.WriteString(":")
bf.WriteString(convertAliyunPassword(secretKey))
bf.WriteString("@")
bf.WriteString(endPoint)
bf.WriteString("/")
bf.WriteString(vHost)
return bf.String()
}