/
station_manager.go
141 lines (120 loc) · 2.93 KB
/
station_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package otto
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// StationManager keeps track of all the stations we have seen
type StationManager struct {
Stations map[string]*Station `json:"stations"`
Stale map[string]*Station `json:"stale"`
EventQ chan *StationEvent
ticker *time.Ticker `json:"-"`
mu *sync.Mutex `json:"-"`
}
type StationEvent struct {
Type string `json:"type"`
Device string `json:"device"`
StationID string `json:"stationid"`
Value string `json:"value"`
}
var (
Stations *StationManager
)
func init() {
Stations = NewStationManager()
}
func NewStationManager() (sm *StationManager) {
sm = &StationManager{}
sm.Stations = make(map[string]*Station)
sm.Stale = make(map[string]*Station)
sm.mu = new(sync.Mutex)
// Start a ticker to clean up stale entries
sm.EventQ = make(chan *StationEvent)
quit := make(chan struct{})
sm.ticker = time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-sm.ticker.C:
for id, st := range sm.Stations {
// Do not timeout stations with a duration of 0
if st.Expiration == 0 {
log.Printf("Station %s expiration == 0 do not timeout", id)
continue
}
// Timeout a station if we have not heard from it in 3
// timeframes.
st.mu.Lock()
expires := st.LastHeard.Add(st.Expiration)
if expires.Sub(time.Now()) < 0 {
sm.mu.Lock()
log.Printf("Station: %s has timed out\n", id)
sm.Stale[id] = st
delete(sm.Stations, id)
sm.mu.Unlock()
}
st.mu.Unlock()
}
case ev := <-sm.EventQ:
fmt.Printf("Station Event: ! %+v\n", ev)
st := sm.Get(ev.StationID)
if st == nil {
log.Printf("[W] Station Event could not find station: %s", ev.StationID)
continue
}
st.Relay(ev.Device, ev.Value)
case <-quit:
sm.ticker.Stop()
return
}
}
}()
return sm
}
func (sm *StationManager) Get(stid string) *Station {
sm.mu.Lock()
st, _ := sm.Stations[stid]
sm.mu.Unlock()
return st
}
func (sm *StationManager) Add(st string) (station *Station, err error) {
if sm.Get(st) != nil {
return nil, fmt.Errorf("Error adding an existing station")
}
station = NewStation(st)
sm.mu.Lock()
sm.Stations[st] = station
sm.mu.Unlock()
return station, nil
}
func (sm *StationManager) Update(msg *Msg) (st *Station) {
var err error
data := msg.Data
st = sm.Get(data.ID)
if st == nil {
log.Println("StationManager: Adding new station: ", data.ID)
st, err = sm.Add(data.ID)
if err != nil {
log.Println("StationManager: ERROR Adding new station", data.ID, err)
return
}
}
st.Update(msg)
return st
}
func (sm *StationManager) Count() int {
return len(sm.Stations)
}
func (sm StationManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.Method {
case "GET":
json.NewEncoder(w).Encode(sm)
case "POST", "PUT":
http.Error(w, "Not Yet Supported", 401)
}
}