-
Notifications
You must be signed in to change notification settings - Fork 0
/
enqueuer.go
80 lines (66 loc) · 1.57 KB
/
enqueuer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package worker
import (
"encoding/json"
"sync"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq"
)
var (
enqueuerInstance *enqueuer
once sync.Once
)
type enqueuer struct {
client *asynq.Client
}
type EnqueuerOpts struct {
PoolSize int
RedisUrl string
}
type Enqueuer interface {
EnqueueUniqueTask(*Task) error
EnqueueUniqueTaskIn(*Task, time.Duration) error
}
func NewEnqueuer(opts *EnqueuerOpts) Enqueuer {
once.Do(func() {
redisConnection := asynq.RedisClientOpt{
PoolSize: opts.PoolSize,
Addr: opts.RedisUrl,
}
enqueuerInstance = &enqueuer{
client: asynq.NewClient(redisConnection),
}
})
return enqueuerInstance
}
func (e *enqueuer) EnqueueUniqueTask(task *Task) error {
taskID := uuid.New().String()
bytes, err := json.Marshal(task.Payload)
if err != nil {
return err
}
asynqTask := asynq.NewTask(task.Name, bytes)
opts := []asynq.Option{asynq.TaskID(taskID), asynq.Unique(time.Hour), asynq.MaxRetry(task.Retry), asynq.Timeout(task.Timeout)}
_, err = e.client.Enqueue(
asynqTask,
opts...,
)
return err
}
func (e *enqueuer) EnqueueUniqueTaskIn(task *Task, delay time.Duration) error {
taskID := uuid.New().String()
bytes, err := json.Marshal(task.Payload)
if err != nil {
return err
}
asynqTask := asynq.NewTask(task.Name, bytes)
opts := []asynq.Option{asynq.TaskID(taskID), asynq.Unique(time.Hour), asynq.MaxRetry(task.Retry), asynq.Timeout(task.Timeout), asynq.ProcessIn(delay)}
_, err = e.client.Enqueue(
asynqTask,
opts...,
)
return err
}
func (e *enqueuer) Close() error {
return e.client.Close()
}