Skip to content

Commit

Permalink
Adds an overall handler for managing everything.
Browse files Browse the repository at this point in the history
Creates a unipitt.Handler abstraction that actually takes care of
everything:
- polling the readers
- checking the events and pushing out changes

The setup is not that ideal, but I wanted to have move all of the inner
details to a separate object to deal with it.
  • Loading branch information
mhemeryck committed Nov 23, 2018
1 parent 88e1fd2 commit 61c78cb
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 53 deletions.
60 changes: 9 additions & 51 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package main
import (
"flag"
"log"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/mhemeryck/unipitt"
)

Expand Down Expand Up @@ -42,8 +40,8 @@ func main() {
flag.StringVar(&broker, "broker", "ssl://raspberrypi.lan:8883", "MQTT broker URI")
var clientID string
flag.StringVar(&clientID, "client_id", "unipitt", "MQTT host client ID")
var sysfsRoot string
flag.StringVar(&sysfsRoot, "sysfs_root", unipitt.SysFsRoot, "Root folder to search for digital inputs")
var sysFsRoot string
flag.StringVar(&sysFsRoot, "sysfs_root", unipitt.SysFsRoot, "Root folder to search for digital inputs")
var payload string
flag.StringVar(&payload, "payload", Payload, "Default MQTT message payload")
flag.Parse()
Expand All @@ -53,56 +51,16 @@ func main() {
printVersionInfo()
return
}
// MQTT setup
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientID)
if caFile != "" {
tlsConfig, err := unipitt.NewTLSConfig(caFile)
if err != nil {
log.Printf("Error reading MQTT CA file %s: %s\n", caFile, err)
} else {
opts.SetTLSConfig(tlsConfig)
}
}
mqttClient := mqtt.NewClient(opts)
token := mqttClient.Connect()
if token.Wait() && token.Error() != nil {
log.Println("Can't connect to MQTT host")
}

// Setup digital input
readers, err := unipitt.FindDigitalInputReaders(sysfsRoot)
// Setup handler
handler, err := unipitt.NewHandler(broker, clientID, caFile, sysFsRoot)
if err != nil {
log.Fatal(err)
}
log.Printf("Created %d digital input reader instances from path %s\n", len(readers), sysfsRoot)
for k := range readers {
defer readers[k].Close()
}

events := make(chan *unipitt.DigitalInputReader)
defer close(events)

ticker := time.NewTicker(time.Duration(pollingInterval) * time.Millisecond)
defer ticker.Stop()

// Start polling
for k := range readers {
log.Printf("Initiate polling for %d readers\n", len(readers))
go readers[k].Poll(events, ticker)
}
defer handler.Close()

// Publish on a trigger
for {
select {
case d := <-events:
if d.Err != nil {
log.Printf("Found error %s for topic %s\n", d.Err, d.Topic)
} else {
log.Printf("Trigger for topic %s\n", d.Topic)
mqttClient.Publish(d.Topic, 0, false, payload)
}
}
}
// Start polling (blocking)
done := make(chan bool)
defer close(done)
handler.Poll(done, pollingInterval, payload)
}
2 changes: 0 additions & 2 deletions digital_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ const (
TrueValue = "1"
// FolderRegex represents to regular expression used for finding the required file to read from
FolderRegex = "di_[0-9]_[0-9]{2}"
// SysFsRoot default root folder to search for digital inputs
SysFsRoot = "/sys/devices/platform/unipi_plc"
)

// DigitalInput interface for doing the polling
Expand Down
90 changes: 90 additions & 0 deletions unipitt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package unipitt

import (
"log"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

const (
// SysFsRoot default root folder to search for digital inputs
SysFsRoot = "/sys/devices/platform/unipi_plc"
)

// Unipitt defines the interface with unipi board
type Unipitt interface {
Poll(pollingInterval int) error
Close()
}

// Handler implements handles all unipi to MQTT interactions
type Handler struct {
readers []DigitalInputReader
client mqtt.Client
}

// NewHandler prepares and sets up an entire unipitt handler
func NewHandler(broker string, clientID string, caFile string, sysFsRoot string) (h *Handler, err error) {
h = &Handler{}
// MQTT setup
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientID)
if caFile != "" {
tlsConfig, err := NewTLSConfig(caFile)
if err != nil {
log.Printf("Error reading MQTT CA file %s: %s\n", caFile, err)
} else {
opts.SetTLSConfig(tlsConfig)
}
}
h.client = mqtt.NewClient(opts)

// Digital Input reader setup
h.readers, err = FindDigitalInputReaders(sysFsRoot)
if err != nil {
return
}
log.Printf("Created %d digital input reader instances from path %s\n", len(h.readers), sysFsRoot)
return
}

// Poll starts the actual polling and pushing to MQTT
func (h *Handler) Poll(done chan bool, interval int, payload string) (err error) {
events := make(chan *DigitalInputReader)
defer close(events)

ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()

// Start polling
log.Printf("Initiate polling for %d readers\n", len(h.readers))
for k := range h.readers {
go h.readers[k].Poll(events, ticker)
}

// Publish on a trigger
for {
select {
case d := <-events:
if d.Err != nil {
log.Printf("Found error %s for topic %s\n", d.Err, d.Topic)
} else {
log.Printf("Trigger for topic %s\n", d.Topic)
h.client.Publish(d.Topic, 0, false, payload)
}
case <-done:
log.Println("Handler done polling, coming back ...")
return
}
}
}

// Close loose ends
func (h *Handler) Close() {
// Close the readers
for k := range h.readers {
h.readers[k].Close()
}
}
84 changes: 84 additions & 0 deletions unipitt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package unipitt

import (
"bytes"
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
)

// End-to-end test of handler.
//
// A dummy MQTT client is set up as well as a digital input from some temporary files. A corresponding handler is created. Afterwards, the polling loop starts (blocking). We make a change to the digital input file and see it is captured by looking at the logs. Afterwards, the handler can quit by passing in a done signal.
func TestHandler(t *testing.T) {
broker := "mqtts://foo"
clientID := "unipitt"
caFile := ""
payload := "bar"
pollingInterval := 50

// Setup a folder structure
folder := "di_1_01"
// Create temporary folder, only if it does not exist already
root, err := ioutil.TempDir("", "unipitt")
if err != nil {
t.Fatal(err)
}
sysFsRoot := filepath.Join(root, folder)
if _, pathErr := os.Stat(sysFsRoot); pathErr != nil {
err := os.Mkdir(sysFsRoot, os.ModePerm)
if err != nil {
t.Fatal(err)
}
}
// Create temporary path
tmpfn := filepath.Join(sysFsRoot, "di_value")
// Create temporary file handle
f, err := os.Create(tmpfn)
if err != nil {
t.Fatal(err)
}
// Put in zero-value
_, err = f.WriteString("0\n")
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.RemoveAll(root) // clean up

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot)
if err != nil {
t.Fatal(err)
}
defer handler.Close()

// Setup log monitoring
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()

// Start polling (blocking)
done := make(chan bool)
defer close(done)
// Trigger a send
go func() {
// Trigger a send
f.Seek(0, 0)
_, err = f.WriteString("1\n")
if err != nil {
t.Fatal(err)
}
// Some ugly waiting until everything has settled ...
time.Sleep(1 * time.Second)
done <- true
}()
handler.Poll(done, pollingInterval, payload)
if !bytes.Contains(buf.Bytes(), []byte("Trigger for topic di_1_01")) {
t.Fatal("Expected a trigger to be captured in the log, found none")
}
}

0 comments on commit 61c78cb

Please sign in to comment.