-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
observeDevices.go
110 lines (94 loc) · 2.82 KB
/
observeDevices.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package client
import (
"context"
"github.com/google/uuid"
"github.com/plgd-dev/hub/v2/grpc-gateway/pb"
"github.com/plgd-dev/hub/v2/resource-aggregate/events"
)
type DevicesObservationEvent_type uint8
const (
DevicesObservationEvent_ONLINE DevicesObservationEvent_type = 0
DevicesObservationEvent_OFFLINE DevicesObservationEvent_type = 1
DevicesObservationEvent_REGISTERED DevicesObservationEvent_type = 2
DevicesObservationEvent_UNREGISTERED DevicesObservationEvent_type = 3
)
type DevicesObservationEvent struct {
DeviceIDs []string
Event DevicesObservationEvent_type
}
type DevicesObservationHandler = interface {
Handle(ctx context.Context, event DevicesObservationEvent) error
OnClose()
Error(err error)
}
type devicesObservation struct {
h DevicesObservationHandler
removeSubscription func()
}
func (o *devicesObservation) HandleDeviceMetadataUpdated(ctx context.Context, val *events.DeviceMetadataUpdated) error {
if val.GetConnection() == nil {
return nil
}
event := DevicesObservationEvent_OFFLINE
if val.GetConnection().IsOnline() {
event = DevicesObservationEvent_ONLINE
}
return o.h.Handle(ctx, DevicesObservationEvent{
DeviceIDs: []string{val.GetDeviceId()},
Event: event,
})
}
func (o *devicesObservation) HandleDeviceRegistered(ctx context.Context, val *pb.Event_DeviceRegistered) error {
return o.h.Handle(ctx, DevicesObservationEvent{
DeviceIDs: val.GetDeviceIds(),
Event: DevicesObservationEvent_REGISTERED,
})
}
func (o *devicesObservation) HandleDeviceUnregistered(ctx context.Context, val *pb.Event_DeviceUnregistered) error {
return o.h.Handle(ctx, DevicesObservationEvent{
DeviceIDs: val.GetDeviceIds(),
Event: DevicesObservationEvent_UNREGISTERED,
})
}
func (o *devicesObservation) OnClose() {
o.removeSubscription()
o.h.OnClose()
}
func (o *devicesObservation) Error(err error) {
o.removeSubscription()
o.h.Error(err)
}
func (c *Client) ObserveDevices(ctx context.Context, handler DevicesObservationHandler) (string, error) {
ID, err := uuid.NewRandom()
if err != nil {
return "", err
}
sub, err := c.NewDevicesSubscription(ctx, &devicesObservation{
h: handler,
removeSubscription: func() {
// we can ignore err during removeSubscription, if err != nil then some other
// part of code already removed the subscription
_, _ = c.stopObservingDevices(ID.String())
},
})
if err != nil {
return "", err
}
c.insertSubscription(ID.String(), sub)
return ID.String(), err
}
func (c *Client) stopObservingDevices(observationID string) (wait func(), err error) {
s, err := c.popSubscription(observationID)
if err != nil {
return nil, err
}
return s.Cancel()
}
func (c *Client) StopObservingDevices(observationID string) error {
wait, err := c.stopObservingDevices(observationID)
if err != nil {
return err
}
wait()
return nil
}