Skip to content

Commit

Permalink
Merge pull request #89 from operable/nmohoric/mqtt-config-fixes
Browse files Browse the repository at this point in the history
mqtt config fixes
  • Loading branch information
nmohoric committed Jun 21, 2018
2 parents 21861d5 + 09c1dc0 commit ab7647a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
10 changes: 8 additions & 2 deletions relay/bus/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func (mqc *MQTTConnection) Connect(options ConnectionOptions) error {
compressed := snappy.Encode(nil, []byte(options.OnDisconnect.Body))
mqttOpts.SetWill(options.OnDisconnect.Topic, string(compressed), 1, false)
}
if options.EventsHandler != nil && options.AutoReconnect == true {
mqttOpts.OnConnect = func(c *mqtt.Client) {
mqc.conn = c
mqc.options.EventsHandler(mqc, ConnectedEvent)
}
}
mqc.backoff = NewBackoff()
mqc.conn = mqtt.NewClient(mqttOpts)
for {
Expand All @@ -41,7 +47,7 @@ func (mqc *MQTTConnection) Connect(options ConnectionOptions) error {
}
}
mqc.options = options
if mqc.options.EventsHandler != nil {
if mqc.options.EventsHandler != nil && mqc.options.AutoReconnect != true {
mqc.options.EventsHandler(mqc, ConnectedEvent)
}
return nil
Expand Down Expand Up @@ -77,7 +83,7 @@ func (mqc *MQTTConnection) Subscribe(topic string, handler SubscriptionHandler)
return token.Error()
}

func (mqc *MQTTConnection) disconnected(cilent *mqtt.Client, err error) {
func (mqc *MQTTConnection) disconnected(client *mqtt.Client, err error) {
log.Errorf("MQTT connection failed: %s.", err)
for {
if token := mqc.conn.Connect(); token.Wait() && token.Error() != nil {
Expand Down
5 changes: 3 additions & 2 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ func (r *cogRelay) handleBusEvents(conn bus.Connection, event bus.Event) {
}
if r.catalog.Len() > 0 {
r.catalog.Reconnected()
} else {
log.Info("Loading bundle catalog.")
r.requestBundles()
}
log.Info("Loading bundle catalog.")
r.requestBundles()
}
}

Expand Down

0 comments on commit ab7647a

Please sign in to comment.