Skip to content

Commit

Permalink
Adds exponential back-off reconnect
Browse files Browse the repository at this point in the history
Adds MQTT client reconnection possiblities.

Main idea is to have this running as some sort of daemon, so it can
automatically reconnect in case of issues.
  • Loading branch information
mhemeryck committed Nov 23, 2018
1 parent 61c78cb commit 230956f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
16 changes: 14 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/cenkalti/backoff"
version = "2.0.0"
19 changes: 17 additions & 2 deletions unipitt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"time"

"github.com/cenkalti/backoff"
mqtt "github.com/eclipse/paho.mqtt.golang"
)

Expand Down Expand Up @@ -40,6 +41,10 @@ func NewHandler(broker string, clientID string, caFile string, sysFsRoot string)
}
}
h.client = mqtt.NewClient(opts)
err = h.connect()
if err != nil {
log.Println("Error connecting to MQTT broker ...")
}

// Digital Input reader setup
h.readers, err = FindDigitalInputReaders(sysFsRoot)
Expand All @@ -56,7 +61,6 @@ func (h *Handler) Poll(done chan bool, interval int, payload string) (err error)
defer close(events)

ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()

// Start polling
log.Printf("Initiate polling for %d readers\n", len(h.readers))
Expand All @@ -72,15 +76,26 @@ func (h *Handler) Poll(done chan bool, interval int, payload string) (err error)
log.Printf("Found error %s for topic %s\n", d.Err, d.Topic)
} else {
log.Printf("Trigger for topic %s\n", d.Topic)
h.client.Publish(d.Topic, 0, false, payload)
if token := h.client.Publish(d.Topic, 0, false, payload); token.Wait() && token.Error() != nil {
go backoff.Retry(h.connect, backoff.NewExponentialBackOff())
}
}
case <-done:
log.Println("Handler done polling, coming back ...")
ticker.Stop()
return
}
}
}

// reconnect tries to reconnect the MQTT client to the broker
func (h *Handler) connect() error {
log.Println("Error connecting to MQTT broker ...")
token := h.client.Connect()
token.Wait()
return token.Error()
}

// Close loose ends
func (h *Handler) Close() {
// Close the readers
Expand Down
3 changes: 3 additions & 0 deletions unipitt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,7 @@ func TestHandler(t *testing.T) {
if !bytes.Contains(buf.Bytes(), []byte("Trigger for topic di_1_01")) {
t.Fatal("Expected a trigger to be captured in the log, found none")
}
if !bytes.Contains(buf.Bytes(), []byte("Error connecting to MQTT broker ...")) {
t.Fatal("Expected a reconnect for MQTT broker, did not find one")
}
}

0 comments on commit 230956f

Please sign in to comment.