forked from RichardKnop/machinery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
factories.go
194 lines (163 loc) · 5.18 KB
/
factories.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package machinery
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/RichardKnop/machinery/v1/backends"
"github.com/RichardKnop/machinery/v1/brokers"
"github.com/RichardKnop/machinery/v1/config"
)
// BrokerFactory creates a new object of brokers.Interface
// Currently only AMQP/S broker is supported
func BrokerFactory(cnf *config.Config) (brokers.Interface, error) {
if strings.HasPrefix(cnf.Broker, "amqp://") {
return brokers.NewAMQPBroker(cnf), nil
}
if strings.HasPrefix(cnf.ResultBackend, "amqps://") {
return brokers.NewAMQPBroker(cnf), nil
}
if strings.HasPrefix(cnf.Broker, "redis://") {
parts := strings.Split(cnf.Broker, "redis://")
if len(parts) != 2 {
return nil, fmt.Errorf(
"Redis broker connection string should be in format redis://host:port, instead got %s",
cnf.Broker,
)
}
redisHost, redisPassword, redisDB, err := ParseRedisURL(cnf.Broker)
if err != nil {
return nil, err
}
return brokers.NewRedisBroker(cnf, redisHost, redisPassword, "", redisDB), nil
}
if strings.HasPrefix(cnf.Broker, "redis+socket://") {
redisSocket, redisPassword, redisDB, err := ParseRedisSocketURL(cnf.Broker)
if err != nil {
return nil, err
}
return brokers.NewRedisBroker(cnf, "", redisPassword, redisSocket, redisDB), nil
}
if strings.HasPrefix(cnf.Broker, "eager") {
return brokers.NewEagerBroker(), nil
}
return nil, fmt.Errorf("Factory failed with broker URL: %v", cnf.Broker)
}
// BackendFactory creates a new object of backends.Interface
// Currently supported backends are AMQP/S and Memcache
func BackendFactory(cnf *config.Config) (backends.Interface, error) {
if strings.HasPrefix(cnf.ResultBackend, "amqp://") {
return backends.NewAMQPBackend(cnf), nil
}
if strings.HasPrefix(cnf.ResultBackend, "amqps://") {
return backends.NewAMQPBackend(cnf), nil
}
if strings.HasPrefix(cnf.ResultBackend, "memcache://") {
parts := strings.Split(cnf.ResultBackend, "memcache://")
if len(parts) != 2 {
return nil, fmt.Errorf(
"Memcache result backend connection string should be in format memcache://server1:port,server2:port, instead got %s",
cnf.ResultBackend,
)
}
servers := strings.Split(parts[1], ",")
return backends.NewMemcacheBackend(cnf, servers), nil
}
if strings.HasPrefix(cnf.ResultBackend, "redis://") {
redisHost, redisPassword, redisDB, err := ParseRedisURL(cnf.ResultBackend)
if err != nil {
return nil, err
}
return backends.NewRedisBackend(cnf, redisHost, redisPassword, "", redisDB), nil
}
if strings.HasPrefix(cnf.ResultBackend, "redis+socket://") {
redisSocket, redisPassword, redisDB, err := ParseRedisSocketURL(cnf.ResultBackend)
if err != nil {
return nil, err
}
return backends.NewRedisBackend(cnf, "", redisPassword, redisSocket, redisDB), nil
}
if strings.HasPrefix(cnf.ResultBackend, "mongodb://") {
return backends.NewMongodbBackend(cnf), nil
}
if strings.HasPrefix(cnf.ResultBackend, "eager") {
return backends.NewEagerBackend(), nil
}
return nil, fmt.Errorf("Factory failed with result backend: %v", cnf.ResultBackend)
}
// ParseRedisURL ...
func ParseRedisURL(url string) (host, password string, db int, err error) {
// redis://pwd@host/db
parts := strings.Split(url, "redis://")
if parts[0] != "" {
err = errors.New("No redis scheme found")
return
}
if len(parts) != 2 {
err = fmt.Errorf("Redis connection string should be in format redis://password@host:port/db, instead got %s", url)
return
}
parts = strings.Split(parts[1], "@")
var hostAndDB string
if len(parts) == 2 {
//[pwd, host/db]
password = parts[0]
hostAndDB = parts[1]
} else {
hostAndDB = parts[0]
}
parts = strings.Split(hostAndDB, "/")
if len(parts) == 1 {
//[host]
host, db = parts[0], 0 //default redis db
} else {
//[host, db]
host = parts[0]
db, err = strconv.Atoi(parts[1])
if err != nil {
db, err = 0, nil //ignore err here
}
}
return
}
// ParseRedisSocketURL extracts Redis connection options from a URL with the
// redis+socket:// scheme. This scheme is not standard (or even de facto) and
// is used as a transitional mechanism until the the config package gains the
// proper facilities to support socket-based connections.
func ParseRedisSocketURL(url string) (path, password string, db int, err error) {
parts := strings.Split(url, "redis+socket://")
if parts[0] != "" {
err = errors.New("No redis scheme found")
return
}
// redis+socket://password@/path/to/file.soc:/db
if len(parts) != 2 {
err = fmt.Errorf("Redis socket connection string should be in format redis+socket://password@/path/to/file.sock:/db, instead got %s", url)
return
}
remainder := parts[1]
// Extract password if any
parts = strings.SplitN(remainder, "@", 2)
if len(parts) == 2 {
password = parts[0]
remainder = parts[1]
} else {
remainder = parts[0]
}
// Extract path
parts = strings.SplitN(remainder, ":", 2)
path = parts[0]
if path == "" {
err = fmt.Errorf("Redis socket connection string should be in format redis+socket://password@/path/to/file.sock:/db, instead got %s", url)
return
}
if len(parts) == 2 {
remainder = parts[1]
}
// Extract DB if any
parts = strings.SplitN(remainder, "/", 2)
if len(parts) == 2 {
db, _ = strconv.Atoi(parts[1])
}
return
}