-
-
Notifications
You must be signed in to change notification settings - Fork 27
/
node.go
699 lines (576 loc) · 16.8 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
package client
import (
"errors"
"fmt"
"log"
"reflect"
"sort"
"time"
"github.com/goccy/go-yaml"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/simpleiot/simpleiot/data"
)
// GetNodes over NATS. Maps to the `p.<id>.<parent>` NATS API.
// Returns data.ErrDocumentNotFound if node is not found.
// If parent is set to "none", the edge details are not included
// and the hash is blank.
// If parent is set to "all", then all living instances of the node are returned.
// If parent is set and id is "all", then all child nodes are returned.
// Parent can be set to "root" and id to "all" to fetch the root node(s).
func GetNodes(nc *nats.Conn, parent, id, typ string, includeDel bool) ([]data.NodeEdge, error) {
if parent == "" {
parent = "none"
}
if id == "" {
id = "all"
}
var requestPoints data.Points
if includeDel {
requestPoints = append(requestPoints,
data.Point{Type: data.PointTypeTombstone, Value: data.BoolToFloat(includeDel)})
}
if typ != "" {
requestPoints = append(requestPoints,
data.Point{Type: data.PointTypeNodeType, Text: typ})
}
reqData, err := requestPoints.ToPb()
if err != nil {
return nil, fmt.Errorf("Error encoding reqData: %v", err)
}
subject := fmt.Sprintf("nodes.%v.%v", parent, id)
nodeMsg, err := nc.Request(subject, reqData, time.Second*20)
if err != nil {
return []data.NodeEdge{}, err
}
nodes, err := data.PbDecodeNodesRequest(nodeMsg.Data)
if err != nil {
return []data.NodeEdge{}, err
}
return nodes, nil
}
// GetNodesType gets node of a custom type.
// id and parent work the same as [GetNodes]
// Deleted nodes are not included.
func GetNodesType[T any](nc *nats.Conn, parent, id string) ([]T, error) {
var x T
nodeType := data.ToCamelCase(reflect.TypeOf(x).Name())
nodes, err := GetNodes(nc, parent, id, nodeType, false)
if err != nil {
return []T{}, err
}
// decode from NodeEdge to custom types
ret := make([]T, len(nodes))
for i, n := range nodes {
err := data.Decode(data.NodeEdgeChildren{NodeEdge: n, Children: nil}, &ret[i])
if err != nil {
log.Println("Error decode node in GetNodeType:", err)
}
}
return ret, nil
}
// GetRootNode returns the root node of the instance
func GetRootNode(nc *nats.Conn) (data.NodeEdge, error) {
rootNodes, err := GetNodes(nc, "root", "all", "", false)
if err != nil {
return data.NodeEdge{}, err
}
if len(rootNodes) == 0 {
return data.NodeEdge{}, data.ErrDocumentNotFound
}
return rootNodes[0], nil
}
// GetNodesForUser gets all nodes for a user
func GetNodesForUser(nc *nats.Conn, userID string) ([]data.NodeEdge, error) {
var none []data.NodeEdge
var ret []data.NodeEdge
userNodes, err := GetNodes(nc, "all", userID, "", false)
if err != nil {
return none, err
}
var getChildren func(id string) ([]data.NodeEdge, error)
// getNodesHelper recursively gets children of a node
getChildren = func(id string) ([]data.NodeEdge, error) {
var ret []data.NodeEdge
children, err := GetNodes(nc, id, "all", "", false)
if err != nil {
return nil, err
}
for _, c := range children {
grands, err := getChildren(c.ID)
if err != nil {
return nil, err
}
ret = append(ret, grands...)
}
ret = append(ret, children...)
return ret, nil
}
// go through parents of root nodes and recursively get all children
for _, un := range userNodes {
parents, err := GetNodes(nc, "all", un.Parent, "", false)
if err != nil {
return none, fmt.Errorf("Error getting root node: %v", err)
}
// The frontend expects the top level nodes to have Parent set
// to root
for i := range parents {
parents[i].Parent = "root"
}
ret = append(ret, parents...)
c, err := getChildren(un.Parent)
if err != nil {
return none, fmt.Errorf("Error getting children: %v", err)
}
ret = append(ret, c...)
}
ret = data.RemoveDuplicateNodesIDParent(ret)
return ret, nil
}
// SendNode is used to send a node to a nats server. Can be
// used to create nodes.
func SendNode(nc *nats.Conn, node data.NodeEdge, origin string) error {
if origin != "" {
for i := range node.Points {
if node.Points[i].Origin == "" {
node.Points[i].Origin = origin
}
}
for i := range node.EdgePoints {
if node.EdgePoints[i].Origin == "" {
node.EdgePoints[i].Origin = origin
}
}
}
// we need to send the edge points first if we are creating
// a new node, otherwise the upstream will detect an ophraned node
// and create a new edge to the root node
points := node.Points
if node.ID == "" {
return errors.New("ID must be set to a UUID")
}
if node.Parent == "" || node.Parent == "none" {
return errors.New("Parent must be set when sending a node")
}
err := SendNodePoints(nc, node.ID, points, true)
if err != nil {
return fmt.Errorf("Error sending node: %v", err)
}
if len(node.EdgePoints) <= 0 {
// edge should always have a tombstone point, set to false for root node
node.EdgePoints = []data.Point{{Time: time.Now(),
Type: data.PointTypeTombstone, Origin: origin}}
}
node.EdgePoints = append(node.EdgePoints, data.Point{
Type: data.PointTypeNodeType,
Text: node.Type,
Origin: origin,
})
err = SendEdgePoints(nc, node.ID, node.Parent, node.EdgePoints, true)
if err != nil {
return fmt.Errorf("Error sending edge points: %w", err)
}
return nil
}
// SendNodeType is used to send a node to a nats server. Can be
// used to create nodes.
func SendNodeType[T any](nc *nats.Conn, node T, origin string) error {
ne, err := data.Encode(node)
if err != nil {
return err
}
return SendNode(nc, ne, origin)
}
func duplicateNodeHelper(nc *nats.Conn, node data.NodeEdge, newParent, origin string) error {
children, err := GetNodes(nc, node.ID, "all", "", false)
if err != nil {
return fmt.Errorf("GetNodes error: %v", err)
}
// create new ID for duplicate node
node.ID = uuid.New().String()
node.Parent = newParent
err = SendNode(nc, node, origin)
if err != nil {
return fmt.Errorf("SendNode error: %v", err)
}
for _, c := range children {
err := duplicateNodeHelper(nc, c, node.ID, origin)
if err != nil {
return err
}
}
return nil
}
// DuplicateNode is used to Duplicate a node and all its children
func DuplicateNode(nc *nats.Conn, id, newParent, origin string) error {
nodes, err := GetNodes(nc, "all", id, "", false)
if err != nil {
return fmt.Errorf("GetNode error: %v", err)
}
if len(nodes) < 1 {
return fmt.Errorf("No nodes returned")
}
node := nodes[0]
switch node.Type {
case data.NodeTypeUser:
lastName, _ := node.Points.Text(data.PointTypeLastName, "0")
lastName = lastName + " (Duplicate)"
node.AddPoint(data.Point{Type: data.PointTypeLastName, Key: "0", Text: lastName})
default:
desc := node.Desc() + " (Duplicate)"
node.AddPoint(data.Point{Type: data.PointTypeDescription, Key: "0", Text: desc})
}
return duplicateNodeHelper(nc, node, newParent, origin)
}
// DeleteNode removes a node from the specified parent node
func DeleteNode(nc *nats.Conn, id, parent string, origin string) error {
err := SendEdgePoint(nc, id, parent, data.Point{
Type: data.PointTypeTombstone,
Value: 1,
Origin: origin,
}, true)
return err
}
// MoveNode moves a node from one parent to another
func MoveNode(nc *nats.Conn, id, oldParent, newParent, origin string) error {
if newParent == oldParent {
return errors.New("can't move node to itself")
}
// fetch the node because we need to know its type
nodes, err := GetNodes(nc, "all", id, "", true)
if err != nil {
return err
}
if len(nodes) < 1 {
return errors.New("Error fetching node to get type")
}
err = SendEdgePoints(nc, id, newParent, data.Points{
{Type: data.PointTypeTombstone, Value: 0, Origin: origin},
{Type: data.PointTypeNodeType, Text: nodes[0].Type, Origin: origin},
}, true)
if err != nil {
return err
}
err = SendEdgePoint(nc, id, oldParent, data.Point{
Type: data.PointTypeTombstone,
Value: 1,
}, true)
if err != nil {
return err
}
return nil
}
// MirrorNode adds a an existing node to a new parent. A node can have
// multiple parents.
func MirrorNode(nc *nats.Conn, id, newParent, origin string) error {
// fetch the node because we need to know its type
nodes, err := GetNodes(nc, "all", id, "", true)
if err != nil {
return err
}
if len(nodes) < 1 {
return errors.New("Error fetching node to get type")
}
err = SendEdgePoints(nc, id, newParent, data.Points{
{Type: data.PointTypeTombstone, Value: 0, Origin: origin},
{Type: data.PointTypeNodeType, Text: nodes[0].Type, Origin: origin},
}, true)
return err
}
// NodeWatcher creates a node watcher. update() is called any time there is an update.
// Stop can be called to stop the watcher. get() can be called to get the current value.
func NodeWatcher[T any](nc *nats.Conn, id, parent string) (get func() T, stop func(), err error) {
stopCh := make(chan struct{})
var current T
pointUpdates := make(chan []data.Point)
edgeUpdates := make(chan []data.Point)
// create subscriptions first so that we get any updates that might happen between the
// time we fetch node and start subscriptions
stopPointSub, err := SubscribePoints(nc, id, func(points []data.Point) {
pointUpdates <- points
})
if err != nil {
return nil, nil, fmt.Errorf("Point subscribe failed: %v", err)
}
stopEdgeSub, err := SubscribeEdgePoints(nc, id, parent, func(points []data.Point) {
edgeUpdates <- points
})
if err != nil {
return nil, nil, fmt.Errorf("Edge point subscribe failed: %v", err)
}
nodes, err := GetNodesType[T](nc, parent, id)
if err != nil {
if err != data.ErrDocumentNotFound {
return nil, nil, fmt.Errorf("Error getting node: %v", err)
}
// if document is not found, that is OK, points will populate it once they come in
}
// FIXME: we may still have a race condition where older point updates will overwrite
// a new update when we fetch the node.
if len(nodes) > 0 {
current = nodes[0]
}
getCurrent := make(chan chan T)
// main loop for watcher. All data access must go through the main
// loop to avoid race conditions.
go func() {
for {
select {
case <-stopCh:
return
case r := <-getCurrent:
r <- current
case pts := <-pointUpdates:
err := data.MergePoints(id, pts, ¤t)
if err != nil {
log.Println("NodeWatcher, error merging points:", err)
}
case pts := <-edgeUpdates:
err := data.MergeEdgePoints(id, parent, pts, ¤t)
if err != nil {
log.Println("NodeWatcher, error merging edge points:", err)
}
}
}
}()
return func() T {
ret := make(chan T)
getCurrent <- ret
return <-ret
}, func() {
stopPointSub()
stopEdgeSub()
close(stopCh)
}, nil
}
// SiotExport is the format used for exporting and importing data (currently YAML)
type SiotExport struct {
Nodes []data.NodeEdgeChildren
}
// ExportNodes is used to export nodes at a particular location to YAML
// The YAML format looks like:
//
// nodes:
// - id: inst1
// type: device
// parent: root
// points:
// - type: versionApp
// children:
// - id: d7f5bbe9-a300-4197-93fa-b8e5e07f683a
// type: user
// parent: inst1
// points:
// - type: firstName
// text: admin
// - type: lastName
// text: user
// - type: phone
// - type: email
// text: admin@admin.com
// - type: pass
// text: admin
//
// Key="0" and Tombstone points with value set to 0 are removed from the export to make
// it easier to read.
func ExportNodes(nc *nats.Conn, id string) ([]byte, error) {
if id == "root" || id == "" {
root, err := GetRootNode(nc)
if err != nil {
return nil, fmt.Errorf("Error getting root node: %w", err)
}
id = root.ID
}
rootNodes, err := GetNodes(nc, "all", id, "", false)
if err != nil {
return nil, fmt.Errorf("Error getting root nodes: %w", err)
}
if len(rootNodes) < 1 {
return nil, fmt.Errorf("no root nodes returned")
}
var necNodes []data.NodeEdgeChildren
// we only export one node as there may be multiple mirrors of the node in the tree
nec := data.NodeEdgeChildren{NodeEdge: rootNodes[0], Children: nil}
err = exportNodesHelper(nc, &nec)
if err != nil {
return nil, err
}
necNodes = append(necNodes, nec)
ne := SiotExport{Nodes: necNodes}
return yaml.Marshal(ne)
}
func exportNodesHelper(nc *nats.Conn, node *data.NodeEdgeChildren) error {
// sort edge and node points
sort.Sort(data.ByTypeKey(node.Points))
sort.Sort(data.ByTypeKey(node.EdgePoints))
// reduce a little noise ...
// remove tombstone "0" edge points as that does not convey much information
// also remove and key="0" fields in points
for i, p := range node.Points {
if p.Key == "0" {
node.Points[i].Key = ""
}
}
for i, p := range node.EdgePoints {
if p.Key == "0" {
node.EdgePoints[i].Key = ""
}
}
// remove tombstone 0 edge points
i := 0
for _, p := range node.EdgePoints {
if p.Type == data.PointTypeTombstone && p.Value == 0 {
continue
}
node.EdgePoints[i] = p
i++
}
node.EdgePoints = node.EdgePoints[:i]
children, err := GetNodes(nc, node.ID, "all", "", false)
if err != nil {
return fmt.Errorf("Error getting children: %w", err)
}
for _, c := range children {
nec := data.NodeEdgeChildren{NodeEdge: c, Children: nil}
err := exportNodesHelper(nc, &nec)
if err != nil {
return err
}
node.Children = append(node.Children, nec)
}
return nil
}
// ImportNodes is used to import nodes at a location in YAML format. New IDs
// are generated for all nodes unless preserve IDs is set to true.
// If there multiple references to the same ID,
// then an attempt is made to replace all of these with the new ID. This also
// allows you to use "friendly" ID names in hand generated YAML files.
func ImportNodes(nc *nats.Conn, parent string, yamlData []byte, origin string, preserveIDs bool) error {
// first make sure the parent node exists
var rootNode data.NodeEdge
if parent == "root" || parent == "" {
var err error
rootNode, err = GetRootNode(nc)
if err != nil {
return err
}
} else {
n, err := GetNodes(nc, "all", parent, "", false)
if err != nil {
return err
}
if len(n) < 1 {
return fmt.Errorf("Parent node \"%v\" not found", parent)
}
}
var imp SiotExport
err := yaml.Unmarshal(yamlData, &imp)
if err != nil {
return fmt.Errorf("Error parsing YAML data: %w", err)
}
var importHelper func(data.NodeEdgeChildren) error
importHelper = func(node data.NodeEdgeChildren) error {
err := SendNode(nc, node.NodeEdge, origin)
if err != nil {
return fmt.Errorf("Error sending node: %w", err)
}
for _, c := range node.Children {
err := importHelper(c)
if err != nil {
return err
}
}
return nil
}
if len(imp.Nodes) < 1 {
return fmt.Errorf("Error: imported data did not have any nodes")
}
// set parent of first node
imp.Nodes[0].Parent = parent
// append (import) to top level node description
for i, p := range imp.Nodes[0].Points {
if p.Type == data.PointTypeDescription {
imp.Nodes[0].Points[i].Text += " (import)"
}
}
if preserveIDs {
err := checkIDs(imp.Nodes[0], parent)
if err != nil {
return err
}
} else {
ReplaceIDs(&imp.Nodes[0], parent)
}
err = importHelper(imp.Nodes[0])
// if we imported the root node, then we have to tombstone the old root node
if parent == "root" && rootNode.ID != imp.Nodes[0].ID {
err := DeleteNode(nc, rootNode.ID, parent, "import")
if err != nil {
return fmt.Errorf("Error deleting old root node: %w", err)
}
}
return err
}
func checkIDs(node data.NodeEdgeChildren, parent string) error {
if parent == "" {
return fmt.Errorf("parent must be specified")
}
if node.Parent != parent {
return fmt.Errorf("node parent %v does not match parent %v", node.Parent, parent)
}
if node.ID == "" {
return fmt.Errorf("ID cannot be blank")
}
for _, c := range node.Children {
err := checkIDs(c, node.ID)
if err != nil {
return err
}
}
return nil
}
// ReplaceIDs is used to replace IDs tree of nodes.
// If there multiple references to the same ID,
// then an attempt is made to replace all of these with the new ID.
// This function modifies the tree that is passed in.
// Replace IDs also updates the partent fields.
func ReplaceIDs(nodes *data.NodeEdgeChildren, parent string) {
// idMap is used to translate old IDs to new
idMap := make(map[string]string)
var replaceHelper func(*data.NodeEdgeChildren, string)
replaceHelper = func(n *data.NodeEdgeChildren, parent string) {
n.Parent = parent
// update node ID
var newID string
if n.ID == "" {
// always assign a new ID if blank
newID = uuid.New().String()
} else {
var ok bool
newID, ok = idMap[n.ID]
if !ok {
newID = uuid.New().String()
idMap[n.ID] = newID
}
}
n.ID = newID
// check for any points that might have node hashes
for i, p := range n.Points {
if p.Type == data.PointTypeNodeID {
if p.Text == "" {
continue
}
newID, ok := idMap[p.Text]
if !ok {
newID = uuid.New().String()
idMap[p.Text] = newID
}
n.Points[i].Text = newID
}
}
for i := range n.Children {
replaceHelper(&n.Children[i], n.ID)
}
}
replaceHelper(nodes, parent)
}