-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
71 lines (58 loc) · 1.46 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package queue
import (
"github.com/hibiken/asynq"
"github.com/hibiken/asynqmon"
"github.com/jaevor/go-nanoid"
"github.com/jirevwe/cascade/internal/pkg/config"
"github.com/jirevwe/cascade/internal/pkg/util"
)
type RedisQueue struct {
opts QueueOptions
client *asynq.Client
inspector *asynq.Inspector
}
func NewClient(cfg config.Configuration) (*asynq.Client, error) {
rdb, err := NewRedis(cfg.RedisDsn)
if err != nil {
return nil, err
}
client := asynq.NewClient(rdb)
return client, nil
}
func NewQueue(opts QueueOptions) Queuer {
client := asynq.NewClient(opts.RedisClient)
inspector := asynq.NewInspector(opts.RedisClient)
return &RedisQueue{
client: client,
opts: opts,
inspector: inspector,
}
}
func (q *RedisQueue) Write(taskName util.TaskName, queueName util.QueueName, job *Job) error {
if job.ID == "" {
generateID, err := nanoid.Standard(21)
if err != nil {
return err
}
job.ID = generateID()
}
t := asynq.NewTask(string(taskName), job.Payload, asynq.Queue(string(queueName)), asynq.TaskID(job.ID), asynq.ProcessIn(job.Delay))
_, err := q.client.Enqueue(t)
if err != nil {
return err
}
return nil
}
func (q *RedisQueue) Options() QueueOptions {
return q.opts
}
func (q *RedisQueue) Monitor() *asynqmon.HTTPHandler {
h := asynqmon.New(asynqmon.Options{
RootPath: "/queue/monitoring",
RedisConnOpt: q.opts.RedisClient,
})
return h
}
func (q *RedisQueue) Inspector() *asynq.Inspector {
return q.inspector
}