/
particle.go
214 lines (178 loc) · 4.96 KB
/
particle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package client
import (
"encoding/json"
"log"
"time"
"github.com/donovanhide/eventsource"
"github.com/nats-io/nats.go"
"github.com/simpleiot/simpleiot/data"
)
// Get Particle.io data using their event API. See:
// https://docs.particle.io/reference/cloud-apis/api/#get-a-stream-of-events
const particleEventURL string = "https://api.particle.io/v1/devices/events/"
// ParticleEvent from particle
type ParticleEvent struct {
Data string `json:"data"`
TTL uint32 `json:"ttl"`
Timestamp time.Time `json:"published_at"`
CoreID string `json:"coreid"`
}
// Particle represents the configuration for the SIOT Particle client
type Particle struct {
ID string `node:"id"`
Parent string `node:"parent"`
Description string `point:"description"`
Disable bool `point:"disable"`
AuthToken string `point:"authToken"`
}
// ParticleClient is a SIOT particle client
type ParticleClient struct {
nc *nats.Conn
config Particle
stop chan struct{}
newPoints chan NewPoints
newEdgePoints chan NewPoints
newParticlePoints chan NewPoints
}
type particlePoint struct {
ID string `json:"id"`
Type string `json:"type"`
Value float64 `json:"value"`
}
func (pp *particlePoint) toPoint() data.Point {
return data.Point{
Key: pp.ID,
Type: pp.Type,
Value: pp.Value,
}
}
// NewParticleClient ...
func NewParticleClient(nc *nats.Conn, config Particle) Client {
return &ParticleClient{
nc: nc,
config: config,
stop: make(chan struct{}),
newPoints: make(chan NewPoints),
newEdgePoints: make(chan NewPoints),
newParticlePoints: make(chan NewPoints),
}
}
// Run runs the main logic for this client and blocks until stopped
func (pc *ParticleClient) Run() error {
log.Println("Starting particle client: ", pc.config.Description)
closeReader := make(chan struct{}) // is closed to close reader
readerClosed := make(chan struct{}) // struct{} is sent when reader exits
var readerRunning bool // indicates reader is running
particleReader := func() {
defer func() {
readerClosed <- struct{}{}
}()
urlAuth := particleEventURL + "sample" + "?access_token=" + pc.config.AuthToken
stream, err := eventsource.Subscribe(urlAuth, "")
if err != nil {
log.Println("Particle subscription error: ", err)
return
}
for {
select {
case event := <-stream.Events:
var pEvent ParticleEvent
err := json.Unmarshal([]byte(event.Data()), &pEvent)
if err != nil {
log.Println("Got error decoding particle event: ", err)
continue
}
var pPoints []particlePoint
err = json.Unmarshal([]byte(pEvent.Data), &pPoints)
if err != nil {
log.Println("error decoding Particle samples: ", err)
continue
}
points := make(data.Points, len(pPoints))
for i, p := range pPoints {
points[i] = p.toPoint()
points[i].Time = pEvent.Timestamp
}
err = SendNodePoints(pc.nc, pc.config.ID, points, false)
if err != nil {
log.Println("Particle error sending points: ", err)
}
case err := <-stream.Errors:
log.Println("Particle error: ", err)
case <-closeReader:
log.Println("Exiting particle reader")
return
}
}
}
checkTime := time.Minute
checkReader := time.NewTicker(checkTime)
startReader := func() {
if readerRunning {
return
}
readerRunning = true
go particleReader()
checkReader.Stop()
}
stopReader := func() {
if readerRunning {
closeReader <- struct{}{}
readerRunning = false
}
}
startReader()
done:
for {
select {
case <-pc.stop:
log.Println("Stopping particle client: ", pc.config.Description)
break done
case pts := <-pc.newPoints:
err := data.MergePoints(pts.ID, pts.Points, &pc.config)
if err != nil {
log.Println("error merging new points: ", err)
}
for _, p := range pts.Points {
switch p.Type {
case data.PointTypeAuthToken:
stopReader()
startReader()
case data.PointTypeDisable:
if p.Value == 1 {
stopReader()
} else {
startReader()
}
}
}
case pts := <-pc.newEdgePoints:
err := data.MergeEdgePoints(pts.ID, pts.Parent, pts.Points, &pc.config)
if err != nil {
log.Println("error merging new points: ", err)
}
case <-readerClosed:
readerRunning = false
checkReader.Reset(checkTime)
case <-checkReader.C:
startReader()
}
}
// clean up
stopReader()
return nil
}
// Stop sends a signal to the Run function to exit
func (pc *ParticleClient) Stop(_ error) {
close(pc.stop)
}
// Points is called by the Manager when new points for this
// node are received.
func (pc *ParticleClient) Points(nodeID string, points []data.Point) {
pc.newPoints <- NewPoints{nodeID, "", points}
}
// EdgePoints is called by the Manager when new edge points for this
// node are received.
func (pc *ParticleClient) EdgePoints(nodeID, parentID string, points []data.Point) {
pc.newEdgePoints <- NewPoints{nodeID, parentID, points}
}