/
multi.go
115 lines (106 loc) · 2.45 KB
/
multi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package base
import (
"database/sql"
"fmt"
"sync"
"time"
)
type multi struct {
sync.Mutex
*DebugOutput
db *DB
name string
id string
isLeader bool
timeoutSeconds int
interval time.Duration
}
func newMulti(name string, db *DB, debugConfig *ChatDebugOutputConfig) *multi {
return &multi{
DebugOutput: NewDebugOutput("Multi", debugConfig),
db: db,
timeoutSeconds: 5,
interval: time.Second,
name: name,
}
}
func (m *multi) Heartbeat(shutdownCh chan struct{}) (err error) {
if m == nil {
return nil
}
defer m.Trace(&err, "Heartbeat")()
m.id = RandHexString(8)
m.Debug("Heartbeat: starting multi coordination heartbeat loop: id: %s", m.id)
for {
select {
case <-time.After(m.interval):
m.heartbeat()
case <-shutdownCh:
m.Debug("Heartbeat: shutdown received, deregistering")
m.deregister()
return nil
}
}
}
func (m *multi) IsLeader() bool {
if m == nil {
return true
}
m.Lock()
defer m.Unlock()
return m.isLeader
}
func (m *multi) heartbeat() {
if err := m.db.RunTxn(func(tx *sql.Tx) error {
// update ourselves first
_, err := m.db.Exec(`
INSERT INTO heartbeats (id, name, mtime)
VALUES (?, ?, NOW(6)) ON DUPLICATE KEY UPDATE mtime=NOW(6)
`, m.id, m.name)
if err != nil {
m.Errorf("failed to register heartbeat: %s", err)
return err
}
// see if we are the leader
row := m.db.QueryRow(fmt.Sprintf(`
SELECT id FROM heartbeats
WHERE mtime > NOW(6) - INTERVAL %d SECOND AND name = ?
ORDER BY id DESC
LIMIT 1
`, m.timeoutSeconds), m.name)
var id string
if err := row.Scan(&id); err != nil {
if err != sql.ErrNoRows {
m.Errorf("failed to scan id: %s", err)
return err
}
}
// figure out if we are the leader
m.Lock()
defer m.Unlock()
lastLeader := m.isLeader
m.isLeader = id == m.id
if lastLeader != m.isLeader {
if m.isLeader {
m.Errorf("heartbeat: leader change: isLeader: %v myid: %s", m.isLeader, m.id)
} else {
m.Errorf("heartbeat: leader change: isLeader: %v myid: %s leaderid: %s", m.isLeader, m.id,
id)
}
}
return nil
}); err != nil {
m.Errorf("heartbeat failed to run txn: %s", err)
}
}
func (m *multi) deregister() {
if err := m.db.RunTxn(func(tx *sql.Tx) error {
_, err := m.db.Exec(`
DELETE from heartbeats
WHERE id = ? OR mtime < NOW() - INTERVAL 1 MINUTE
`, m.id)
return err
}); err != nil {
m.Errorf("deregister: failed to run txn: %s", err)
}
}