This repository has been archived by the owner on Dec 15, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 261
/
redis_query_results.go
167 lines (142 loc) · 3.94 KB
/
redis_query_results.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package pubsub
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gomodule/redigo/redis"
"github.com/kolide/fleet/server/kolide"
"github.com/pkg/errors"
)
type redisQueryResults struct {
// connection pool
pool *redis.Pool
}
var _ kolide.QueryResultStore = &redisQueryResults{}
// NewRedisPool creates a Redis connection pool using the provided server
// address, password and database.
func NewRedisPool(server, password string, database int, useTLS bool) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server, redis.DialDatabase(database), redis.DialUseTLS(useTLS))
if err != nil {
return nil, err
}
if password != "" {
if _, err := c.Do("AUTH", password); err != nil {
c.Close()
return nil, err
}
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
}
// NewRedisQueryResults creats a new Redis implementation of the
// QueryResultStore interface using the provided Redis connection pool.
func NewRedisQueryResults(pool *redis.Pool) *redisQueryResults {
return &redisQueryResults{pool: pool}
}
func pubSubForID(id uint) string {
return fmt.Sprintf("results_%d", id)
}
func (r *redisQueryResults) WriteResult(result kolide.DistributedQueryResult) error {
conn := r.pool.Get()
defer conn.Close()
channelName := pubSubForID(result.DistributedQueryCampaignID)
jsonVal, err := json.Marshal(&result)
if err != nil {
return errors.Wrap(err, "marshalling JSON for result")
}
n, err := redis.Int(conn.Do("PUBLISH", channelName, string(jsonVal)))
if err != nil {
return errors.Wrap(err, "PUBLISH failed to channel "+channelName)
}
if n == 0 {
return noSubscriberError{channelName}
}
return nil
}
// receiveMessages runs in a goroutine, forwarding messages from the Pub/Sub
// connection over the provided channel. This effectively allows a select
// statement to run on conn.Receive() (by running on the channel that is being
// fed by this function)
func receiveMessages(conn *redis.PubSubConn, outChan chan<- interface{}) {
defer func() {
close(outChan)
}()
for {
msg := conn.Receive()
outChan <- msg
switch msg := msg.(type) {
case error:
// If an error occurred (i.e. connection was closed),
// then we should exit
return
case redis.Subscription:
// If the subscription count is 0, the ReadChannel call
// that invoked this goroutine has unsubscribed, and we
// can exit
if msg.Count == 0 {
return
}
}
}
}
func (r *redisQueryResults) ReadChannel(ctx context.Context, query kolide.DistributedQueryCampaign) (<-chan interface{}, error) {
outChannel := make(chan interface{})
conn := redis.PubSubConn{Conn: r.pool.Get()}
pubSubName := pubSubForID(query.ID)
conn.Subscribe(pubSubName)
msgChannel := make(chan interface{})
// Run a separate goroutine feeding redis messages into
// msgChannel
go receiveMessages(&conn, msgChannel)
go func() {
defer close(outChannel)
defer conn.Close()
for {
// Loop reading messages from conn.Receive() (via
// msgChannel) until the context is cancelled.
select {
case msg, ok := <-msgChannel:
if !ok {
return
}
switch msg := msg.(type) {
case redis.Message:
var res kolide.DistributedQueryResult
err := json.Unmarshal(msg.Data, &res)
if err != nil {
outChannel <- err
}
outChannel <- res
case error:
outChannel <- errors.Wrap(msg, "reading from redis")
}
case <-ctx.Done():
conn.Unsubscribe()
}
}
}()
return outChannel, nil
}
// HealthCheck verifies that the redis backend can be pinged, returning an error
// otherwise.
func (r *redisQueryResults) HealthCheck() error {
conn := r.pool.Get()
defer conn.Close()
if _, err := conn.Do("PING"); err != nil {
return errors.Wrap(err, "reading from redis")
}
return nil
}