/
queue.go
153 lines (134 loc) · 3.27 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package queue
import (
"encoding/json"
"net/url"
"time"
que "github.com/bgentry/que-go"
"github.com/jackc/pgx"
)
var (
// CleanChannelJob describes channel cleanup requests
CleanChannelJob = "CleanChannelRequests"
// DelayedDeleteJob describes delayed delete requests
DelayedDeleteJob = "DelayedDeleteRequests"
)
// DelayedDeleteRequest is the struct for doing a delayed delete
type DelayedDeleteRequest struct {
Token string `json:"token"`
Channel string `json:"channel_id"`
Timestamp string `json:"ts"`
}
// CleanChannelRequest is the struct for doing a channel cleanup
type CleanChannelRequest struct {
Token string `json:"token"`
Channel string `json:"channel_id"`
UserID string `json:"user_id"`
Options CleanChannelOpts `json:"command_options"`
}
// Queue is a job queue to pass messages between the web thread and workers
type Queue struct {
qc *que.Client
pgxpool *pgx.ConnPool
wm *que.WorkMap
workers *que.WorkerPool
}
// NewQueue initializes and creates a new message passing queue
func NewQueue(dbURL *url.URL) (*Queue, error) {
pgxpool, qc, err := setupDB(dbURL.String())
if err != nil {
return nil, err
}
wm := &que.WorkMap{
DelayedDeleteJob: delayedDelete,
CleanChannelJob: cleanChannel,
}
return &Queue{
qc: qc,
pgxpool: pgxpool,
wm: wm,
}, nil
}
// Close cleanups up the queue
func (q *Queue) Close() {
if q.workers != nil {
q.workers.Shutdown()
}
if q.pgxpool != nil {
q.pgxpool.Close()
}
}
// QueueCleanChannel enqueues a cleanup channel job
func (q *Queue) QueueCleanChannel(token, channel, userID string, options CleanChannelOpts) error {
req := CleanChannelRequest{
Token: token,
Channel: channel,
UserID: userID,
Options: options,
}
args, err := json.Marshal(req)
if err != nil {
return err
}
j := que.Job{
Type: CleanChannelJob,
Args: args,
}
return q.qc.Enqueue(&j)
}
// QueueDelayedDelete enqueues a delayed message delete job
func (q *Queue) QueueDelayedDelete(token, channel, ts string, runAt time.Time) error {
req := DelayedDeleteRequest{
Token: token,
Channel: channel,
Timestamp: ts,
}
args, err := json.Marshal(req)
if err != nil {
return err
}
j := que.Job{
Type: DelayedDeleteJob,
Args: args,
RunAt: runAt,
}
return q.qc.Enqueue(&j)
}
// InitWorkerPool initializes a worker pool to do work
func (q *Queue) InitWorkerPool(numWorkers int) {
if q.wm == nil {
return
}
q.workers = que.NewWorkerPool(q.qc, *q.wm, numWorkers)
}
// StartWorkers starts up the worker pool
func (q *Queue) StartWorkers() {
if q.workers != nil {
q.workers.Start()
}
}
// getPgxPool based on the provided database URL
func getPgxPool(dbURL string) (*pgx.ConnPool, error) {
pgxcfg, err := pgx.ParseURI(dbURL)
if err != nil {
return nil, err
}
pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
ConnConfig: pgxcfg,
AfterConnect: que.PrepareStatements,
})
if err != nil {
return nil, err
}
return pgxpool, nil
}
// setupDB a *pgx.ConnPool and *que.Client
// This is here so that setup routines can easily be shared between web and
// workers
func setupDB(dbURL string) (*pgx.ConnPool, *que.Client, error) {
pgxpool, err := getPgxPool(dbURL)
if err != nil {
return nil, nil, err
}
qc := que.NewClient(pgxpool)
return pgxpool, qc, err
}