-
Notifications
You must be signed in to change notification settings - Fork 0
/
priority_fetcher.go
70 lines (59 loc) · 1.75 KB
/
priority_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package profiling
import (
"fmt"
"github.com/adjust/rmq/v3"
"github.com/go-redis/redis/v7"
"log"
)
type PriorityFetcher interface {
// Profile instructs the profiler to generate a priority for a session.
Profile(sessionID string)
// Fetch retrieves a priority for a session from a key-value store.
Fetch(sessionID string) (Priority, error)
}
type RedisPriorityFetcher struct {
prioritiesClient *redis.Client
queue rmq.Queue
}
const RedisQueueTag = "profiler service"
const RedisQueueName = "sessions"
func NewRedisPriorityFetcher(addr string, password string, prioritiesDB int, queueDB int) (*RedisPriorityFetcher, error) {
queueConn, err := rmq.OpenConnectionWithRedisClient(RedisQueueTag, redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: queueDB,
}), nil)
if err != nil {
return nil, err
}
queue, err := queueConn.OpenQueue(RedisQueueName)
if err != nil {
return nil, err
}
return &RedisPriorityFetcher{
prioritiesClient: redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: prioritiesDB,
}),
queue: queue,
}, nil
}
func (f *RedisPriorityFetcher) Profile(sessionID string) {
if err := f.queue.Publish(sessionID); err != nil {
log.Printf("could not publish session ID: %s", err)
}
}
func (f *RedisPriorityFetcher) Fetch(sessionID string) (Priority, error) {
val, err := f.prioritiesClient.Get(sessionID).Result()
if err == redis.Nil {
return Unknown, nil
} else if err != nil {
return Unknown, fmt.Errorf("expected rdb.Get(%s) returns nil err; got err = %w", sessionID, err)
}
priority, err := strToPriority(val)
if err != nil {
return Unknown, fmt.Errorf("expected strconv.Atoi(%s) returns nil err; got err = %w", val, err)
}
return priority, nil
}