-
Notifications
You must be signed in to change notification settings - Fork 4
/
consumer.go
145 lines (132 loc) · 3.27 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package consumer
import (
"encoding/json"
"log"
"os"
"runtime/debug"
"time"
"github.com/infinimesh/plugins/pkg/api"
"github.com/gomodule/redigo/redis"
)
type RedisStreamEvent struct {
ID string
DeviceEvent *DeviceEvent
}
type DeviceEvent struct {
Object api.Object
State api.DeviceState
}
type Consumer interface {
Consume() <-chan *DeviceEvent
}
type consumerImpl struct {
redis *redis.Pool
name string
}
func New(pool *redis.Pool) Consumer {
hn, _ := os.Hostname()
return &consumerImpl{
redis: pool,
name: hn,
}
}
func (i *consumerImpl) Consume() <-chan *DeviceEvent {
ret := make(chan *DeviceEvent)
go func() {
for {
shouldCooldown := i.loop(ret)
if shouldCooldown {
time.Sleep(time.Second)
}
}
}()
return ret
}
func (i *consumerImpl) loop(ch chan<- *DeviceEvent) bool {
conn := i.redis.Get()
defer conn.Close()
reply, err := conn.Do("XREADGROUP", "GROUP", "group", i.name, "STREAMS", "objects", ">")
if err != nil {
log.Printf("error on XREADGROUP: %v\n", err)
// XREADGROUP errors if the consumer group does not yet exist in the
// stream. Most of the time however, the consumer group should have
// already been created, thus this is usually not a problem. Placing
// the group creation here feels more self-healing and slightly more
// efficient than creating the group prior to consuming from the stream
i.createGroupIfNotExists()
return true
}
if reply == nil {
return true
}
events := parseReply(reply)
for _, e := range events {
if _, err := conn.Do("XACK", "objects", "group", e.ID); err != nil {
log.Printf("error on XACK: %v\n", err)
}
ch <- e.DeviceEvent
}
return false
}
func (i *consumerImpl) createGroupIfNotExists() {
conn := i.redis.Get()
defer conn.Close()
_, err := conn.Do("XGROUP", "CREATE", "objects", "group", "$", "MKSTREAM")
if err != nil {
log.Printf("error on XGROUP CREATE: %v\n", err)
}
}
func parseReply(reply interface{}) []RedisStreamEvent {
defer func() {
if err := recover(); err != nil {
log.Printf("unexpected error when parsing reply: %v\n", err)
debug.PrintStack()
}
}()
ret := []RedisStreamEvent{}
streamAndEventss := reply.([]interface{})
for _, se := range streamAndEventss {
streamAndEvents := se.([]interface{})
events := streamAndEvents[1].([]interface{})
for _, e := range events {
event := e.([]interface{})
eventID := string(event[0].([]byte))
eventData := event[1].([]interface{})
ret = append(ret, RedisStreamEvent{
ID: eventID,
DeviceEvent: parseEventData(eventData),
})
}
}
return ret
}
func parseEventData(eventData []interface{}) *DeviceEvent {
if eventData == nil {
return nil
}
ret := &DeviceEvent{}
for i := 0; i < len(eventData); i += 2 {
k := string(eventData[i].([]byte))
v := eventData[i+1].([]byte)
switch k {
case "object":
obj := api.Object{}
err := json.Unmarshal(v, &obj)
if err != nil {
log.Printf("unexpected error when unmarshaling object: %v\n", err)
continue
}
ret.Object = obj
case "state":
state := api.DeviceState{}
err := json.Unmarshal(v, &state)
if err != nil {
log.Printf("unexpected error when unmarshaling object: %v\n", err)
}
ret.State = state
default:
log.Printf("unexpected key when parsing event data: %v\n", k)
}
}
return ret
}