forked from Cloudxtreme/lavabot
-
Notifications
You must be signed in to change notification settings - Fork 2
/
hub_timer.go
95 lines (82 loc) · 2.13 KB
/
hub_timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"encoding/json"
"sort"
"sync"
"time"
"github.com/bitly/go-nsq"
r "github.com/dancannon/gorethink"
)
var (
state State
stateLock sync.Mutex
)
func initHub(change chan struct{}) {
// Create a new producer
prod, err := nsq.NewProducer(*nsqdAddress, nsq.NewConfig())
if err != nil {
log.WithField("error", err.Error()).Fatal("Unable to connect to NSQd")
}
// Load the hub state from RethinkDB
cursor, err := r.Db(*rethinkdbDatabase).Table("hub_state").OrderBy(r.OrderByOpts{
Index: "time",
}).Run(session)
if err != nil {
log.WithField("error", err.Error()).Fatal("Unable to query for hub state")
}
var result State
if err := cursor.All(&result); err != nil {
log.WithField("error", err.Error()).Fatal("Unable to parse hub state query")
}
sort.Sort(result)
state = result
log.Printf("%+v", result)
for {
log.Print("Handling hub message")
// TODO: Use MultiPublish
stateLock.Lock()
timersToDelete := []int{}
for id, timer := range state {
if timer.Time.Before(time.Now()) {
// Encode the sender event
body, err := json.Marshal(&SenderEvent{
Name: timer.Name,
Version: timer.Version,
To: timer.To,
From: timer.From,
Input: timer.Input,
})
if err != nil {
log.WithField("error", err.Error()).Error("Unable to encode a sender event")
continue
}
if err := prod.Publish("sender_"+timer.Sender, body); err != nil {
log.WithField("error", err.Error()).Error("Unable to send an event")
continue
}
// Delete it from RDB and the state
r.Db(*rethinkdbDatabase).Table("hub_state").Get(timer.ID).Delete().Exec(session)
timersToDelete = append(timersToDelete, id)
} else {
break
}
}
for y, x := range timersToDelete {
i := x - y
copy(state[i:], state[i+1:])
state[len(state)-1] = nil
state = state[:len(state)-1]
}
stateLock.Unlock()
if len(state) > 0 {
select {
case <-time.After(state[0].Time.Sub(time.Now())):
break
case <-change:
break
}
} else {
<-change
}
}
}