-
Notifications
You must be signed in to change notification settings - Fork 0
/
usecase-ota.go
122 lines (108 loc) · 3.39 KB
/
usecase-ota.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package mqttProvider
/**
mqttProvider/usecase-ota.go:
Handle Interaction with Scheduler
- Send OTA Triggers
- Execute OTA to device
- Watch OTA Progress
- Send Status
- Implements the sch.OTAInteractor to Scheduler
*/
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
dc "github.com/skoona/homie-service/pkg/deviceCore"
sch "github.com/skoona/homie-service/pkg/deviceScheduler"
)
type (
otaStream struct {
notifyChannel chan dc.DeviceMessage
publishChannel chan dc.DeviceMessage
logger log.Logger
}
)
var (
otastream *otaStream
)
func NewOTAStream(plog log.Logger) sch.OTAInteractor {
otastream = &otaStream{
logger: log.With(plog, "service", "OTAInteractor"),
}
return otastream
}
func (s *otaStream) EnableTriggers() chan dc.DeviceMessage {
if s.notifyChannel == nil {
s.notifyChannel = make(chan dc.DeviceMessage, 1024)
}
return s.notifyChannel
}
func (s *otaStream) EnableNotificationsFor(networkName, deviceName string, enabledOrDisable bool) error {
err := handleOTAMessages(networkName, deviceName, enabledOrDisable)
if err != nil {
level.Error(s.logger).Log("error", err.Error())
}
return err
}
func (s *otaStream) OtaPublish(otaMessage dc.DeviceMessage) {
if s.publishChannel == nil {
s.publishChannel = make(chan dc.DeviceMessage, 512)
publishOTAMessages(s.publishChannel, s.logger) // start receiver
}
s.publishChannel <- otaMessage
}
/*
* otaResponses
* Source OTA Responses to Scheduler
*/
var otaResponses mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
// msg.Payload()[0] = byte{0}
if otastream.notifyChannel != nil {
otastream.notifyChannel <- dStream.CreateQueueDeviceMessage(msg)
} else {
level.Error(logger).Log("error", "ota notification channel offline", "topic", msg.Topic())
}
}
/**
* ConsumeOTAMessages
* Handles OTA Streaming responses
*/
func handleOTAMessages(network, name string, enabled bool) error {
var err error
if enabled {
err = WatchOTAProgress(network, name)
} else {
err = UnWatchOTAProgress(network, name)
}
if nil != err {
level.Error(logger).Log("event", "Subscribe Failed", "error", err.Error())
}
level.Debug(logger).Log("ConsumeOTAMessages Completed", enabled, "network", network, "name", name)
return err
}
// WatchOTAProgress for Scheduler
// ignore these $implementation/ota/firmware
func WatchOTAProgress(network, device string) error {
topic := fmt.Sprintf("%s/%s/$implementation/ota/status", network, device)
token := subWithHandler(topic, otaResponses) // OTA Responses
return token.Error()
}
// UnWatchOTAProgress for Scheduler
func UnWatchOTAProgress(network, device string) error {
topic := fmt.Sprintf("%s/%s/$implementation/ota/status", network, device)
token := client.Unsubscribe(topic)
token.Wait()
return token.Error()
}
// Out to MQTT
func publishOTAMessages(publisher chan dc.DeviceMessage, plog log.Logger) {
go func(publishChan chan dc.DeviceMessage) {
level.Debug(plog).Log("event", "publishOTAMessages(gofunc) called")
for otaMessage := range publishChan { // read until closed
publish(otaMessage.Topic(), otaMessage.Payload(), otaMessage.Retained(), otaMessage.Qos())
level.Debug(plog).Log("method", "publishOTAMessages(gofunc)", "queue depth", len(publishChan), "topic", otaMessage.Topic())
}
level.Debug(plog).Log("method", "publishOTAMessages()", "event", "Completed")
}(publisher)
}