Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ station/station
otto.log
debug*
*.code-workspace
*.log
*.log
coverage.*
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package otto

import "github.com/rustyeddy/otto/messanger"

type Config struct {
messanger.Config
}
4 changes: 2 additions & 2 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type Data interface {
// Data is an array of timestamps and values representing the same
// source of data over a period of time
type DataPoint struct {
value any `json:"value"`
timestamp time.Time `json:"time-increment"`
value any
timestamp time.Time
}

func NewData(dat any, ts time.Time) Data {
Expand Down
113 changes: 88 additions & 25 deletions messanger/messanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,26 @@ import (
"fmt"
"log/slog"
"net/http"
"os"
"strings"
"sync"
)

var (
// messanger holds the singleton instance of the active Messanger
messanger Messanger
messangerLock sync.Mutex
)

// MessangerConfig holds configuration parameters for messanger initialization.
// Currently supports broker address configuration for MQTT-based messangers.
type MessangerConfig struct {
type Config struct {
// Broker is the address of the MQTT broker (hostname or IP)
Broker string
Broker string `json:"broker"`
Username string `json:"username"`
Password string `json:"password"`
}

var messangerConfig = MessangerConfig{
Broker: "localhost",
}
var (
// messanger holds the singleton instance of the active Messanger
msgr Messanger
messangerLock sync.Mutex
config Config
)

// MsgHandler is a callback function type for handling incoming messages.
// Subscribers provide a MsgHandler function that will be invoked when
Expand Down Expand Up @@ -112,9 +113,9 @@ type Messanger interface {
//
// Supported ID values:
// - "none": Creates a local in-process messanger without MQTT
// - "mqtt": Creates an MQTT messanger connecting to an external broker
// - "local": Starts an embedded MQTT broker and creates an MQTT messanger
//
// - default: Creates an MQTT messanger connecting to an external broker
Comment thread
rustyeddy marked this conversation as resolved.

// The created messanger becomes the global singleton accessible via GetMessanger().
// If an invalid ID is provided, logs an error and returns nil.
//
Expand All @@ -124,26 +125,24 @@ type Messanger interface {
// if msg == nil {
// log.Fatal("Failed to create messanger")
// }
func NewMessanger(id string) (m Messanger) {
switch id {
func NewMessanger(broker string) (m Messanger) {

switch broker {
case "none":
m = NewMessangerLocal(id)
case "mqtt":
m = NewMessangerMQTT(id, messangerConfig.Broker)
case "local":
msgr = NewMessangerLocal(broker)

case "otto":
_, err := StartMQTTBroker(context.Background())
if err != nil {
slog.Error("Failed to start embedded MQTT broker", "error", err)
return nil
}
Comment on lines 135 to 139
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NewMessanger function discards the shutdown function returned by StartMQTTBroker at line 135, which means there's no way to gracefully shutdown the broker started by this function. This differs from GetMessanger which properly stores it in the global shutdown variable. This creates an inconsistency where brokers started via NewMessanger("otto") cannot be cleanly shut down.

Copilot uses AI. Check for mistakes.
m = NewMessangerMQTT(id, messangerConfig.Broker)
msgr = NewMessangerMQTT(broker, broker)

default:
slog.Error("Unknown messanger ID", "id", id)
return nil
msgr = NewMessangerMQTT(broker, broker)
Comment on lines +128 to +143
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NewMessanger function signature has changed from taking an id parameter to taking a broker parameter, but this is semantically confusing. In line 132, NewMessangerLocal(broker) is called where broker is "none", but "none" is being used as an ID, not a broker address. Similarly, in line 140, NewMessangerMQTT(broker, broker) is called where the first parameter should be an ID but is receiving "otto" (the broker type). This parameter naming and usage is inconsistent and could lead to confusion.

Copilot uses AI. Check for mistakes.
}
messanger = m
return m
return msgr
}

// GetMessanger returns the singleton Messanger instance created by NewMessanger.
Expand All @@ -159,7 +158,60 @@ func NewMessanger(id string) (m Messanger) {
func GetMessanger() Messanger {
messangerLock.Lock()
defer messangerLock.Unlock()
return messanger

if msgr != nil {
return msgr
}

// Take broker from the config first
broker := config.Broker
if broker == "" {
// if no config look for environment variable
broker = os.Getenv("MQTT_BROKER")
}
if broker == "" {
// if no environment variable then default to the built in
// broker
broker = "otto"
}

user := config.Username
if user == "" {
user = os.Getenv("MQTT_USERNAME")
}
pass := config.Password
if pass == "" {
pass = os.Getenv("MQTT_PASSWORD")
}

switch broker {
case "none":
msgr = NewMessangerLocal(broker)

case "otto":
var err error
shutdown, err = StartMQTTBroker(context.Background())
if err != nil {
slog.Error("Failed to start embedded MQTT broker", "error", err)

// A hack if bind address is in use, skip out and just
// use the client to bind to the already running broker
if !strings.Contains(err.Error(), "bind: address already in use") {
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for handling "bind: address already in use" error is inverted. The condition if !strings.Contains(err.Error(), "bind: address already in use") means it will return nil for all errors EXCEPT the "address already in use" error. This is the opposite of what the comment suggests - you want to continue if the address is already in use, but return nil for other errors. The condition should be if strings.Contains(err.Error(), "bind: address already in use") without the negation.

Suggested change
if !strings.Contains(err.Error(), "bind: address already in use") {
if strings.Contains(err.Error(), "bind: address already in use") {
// continue, broker is already running
} else {

Copilot uses AI. Check for mistakes.
return nil
}

slog.Info("Assuming broker is already running, connecting to existing broker")
}
fallthrough

default:
msgr = NewMessangerMQTTWithAuth("otto", broker, user, pass)
}

ms := GetMsgSaver()
ms.Saving = true

return msgr
Comment on lines +166 to +214
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new lazy initialization logic in GetMessanger() (lines 166-214) lacks test coverage. This includes critical paths like:

  1. Environment variable fallback logic (MQTT_BROKER, MQTT_USERNAME, MQTT_PASSWORD)
  2. The "address already in use" error handling for the embedded broker
  3. The config priority (config -> env vars -> default)
  4. The fallthrough from "otto" case to default case

The existing TestGetMessanger only tests singleton behavior after NewMessanger is called, not the lazy initialization path. Consider adding table-driven tests that exercise these scenarios.

Copilot uses AI. Check for mistakes.
}

// MessangerBase provides a base implementation of the Messanger interface
Expand Down Expand Up @@ -246,6 +298,17 @@ func (mb *MessangerBase) PubMsg(msg *Msg) error {
return nil
}

func (mb *MessangerBase) Pub(topic string, data any) error {

b, err := Bytes(data)
if err != nil {
slog.Error("messanger failed to convert bytes", "error", err)
return err
}
msg := NewMsg(topic, b, "otto")
return mb.PubMsg(msg)
}

// Close cleanly shuts down the messanger.
// This base implementation is a no-op that just logs the close operation.
//
Expand Down
28 changes: 0 additions & 28 deletions messanger/messanger_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,31 +182,3 @@ func (m *MessangerLocal) Close() {
}
slog.Debug("MessangerLocal.Close", "id", m.ID())
}

// toBytesUnchecked is an internal helper that converts common types to []byte.
// It attempts to use the package's Bytes() function first, falling back to
// basic type assertions if that fails.
//
// This is a best-effort conversion used internally by the local messanger.
//
// Supported types: []byte, string, and any other type (converted via fmt.Sprintf)
//
// Parameters:
// - v: The value to convert to bytes
//
// Returns the byte representation of the value.
func toBytesUnchecked(v any) []byte {
// try using Bytes helper if available in this package
if b, err := Bytes(v); err == nil {
return b
}
// best-effort fallback
switch x := v.(type) {
case []byte:
return x
case string:
return []byte(x)
default:
return []byte(fmt.Sprintf("%v", v))
}
}
3 changes: 2 additions & 1 deletion messanger/messanger_mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ type MsgPrinter struct{}
// printer := &MsgPrinter{}
// msg.Subscribe("ss/c/+/temp", printer.MsgHandler)
func (m *MsgPrinter) MsgHandler(msg *Msg) error {
fmt.Printf("%+v\n", msg)
str := fmt.Sprintf("%#v", msg)
slog.Info("builtin MsgHandler", "msg", str)
return nil
}
25 changes: 10 additions & 15 deletions messanger/messanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ func TestMessangerBaseServeHTTP(t *testing.T) {

func TestNewMessanger(t *testing.T) {
tests := []struct {
name string
id string
topic string
expected string
expectNil bool
name string
id string
topic string
expected string
}{
{
name: "Create none messanger",
Expand All @@ -67,22 +66,18 @@ func TestNewMessanger(t *testing.T) {
expected: "mqtt",
},
{
name: "Create messanger with unknown id returns nil",
id: "unknown",
topic: "topic1",
expectNil: true,
name: "Create messanger with unknown id returns nil",
Comment thread
rustyeddy marked this conversation as resolved.
id: "unknown",
topic: "topic1",
expected: "unknown",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewMessanger(tt.id)
if tt.expectNil {
assert.Nil(t, m)
} else {
assert.NotNil(t, m)
assert.Equal(t, tt.expected, m.ID())
}
assert.NotNil(t, m)
assert.Equal(t, tt.expected, m.ID())
})
}
}
Expand Down
18 changes: 9 additions & 9 deletions messanger/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ var (
// }
// client.Publish("sensors/temp", "25.5")
type MQTT struct {
id string `json:"id"` // Unique client identifier
Broker string `json:"broker"` // Broker hostname or IP (no protocol/port)
Username string `json:"username"` // MQTT authentication username
Password string `json:"password"` // MQTT authentication password
Debug bool `json:"debug"` // Enable Paho MQTT client debug logging
id string // Unique client identifier
Broker string // Broker hostname or IP (no protocol/port)
Username string // MQTT authentication username
Password string // MQTT authentication password
Debug bool // Enable Paho MQTT client debug logging

error `json:"error"` // Last error encountered
gomqtt.Client `json:"-"` // Embedded Paho MQTT client
error // Last error encountered
gomqtt.Client // Embedded Paho MQTT client
}

// NewMQTT creates a new MQTT client instance with default credentials.
Expand All @@ -69,8 +69,8 @@ func NewMQTT(id string, broker string, topics string) *MQTT {
mqtt = &MQTT{
id: id,
Broker: broker,
Username: "otto",
Password: "otto123",
Username: "",
Password: "",
Comment thread
rustyeddy marked this conversation as resolved.
}
return mqtt
}
Expand Down
29 changes: 24 additions & 5 deletions messanger/mqtt_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package messanger
import (
"context"
"log"
"log/slog"
"sync"

mqttserver "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
Expand Down Expand Up @@ -52,6 +54,10 @@ import (
// Optional Features (commented out):
// - WebSocket listener on port 1882 for browser clients
// - HTTP health check endpoint on port 8081 for monitoring
var (
shutdown func(context.Context) error
)
Comment thread
rustyeddy marked this conversation as resolved.

func StartMQTTBroker(ctx context.Context) (func(context.Context) error, error) {
// Create broker with default options (in-memory state).
srv := mqttserver.New(nil)
Expand Down Expand Up @@ -107,15 +113,28 @@ func StartMQTTBroker(ctx context.Context) (func(context.Context) error, error) {
}
}()

var once sync.Once

// Return a shutdown function to be called from the app.
shutdown = func(_ context.Context) error {
slog.Debug("mqtt broker shutdown called")
// srv.Close() must be wrapped because it returns an error
once.Do(func() { srv.Close() })
return nil
}

// Close on context cancellation if provided.
go func() {
<-ctx.Done()
_ = srv.Close()
_ = shutdown(ctx)
}()

// Return a shutdown function you can call from your app.
shutdown := func(_ context.Context) error {
return srv.Close()
}
return shutdown, nil
}

func StopMQTTBroker(ctx context.Context) error {
if shutdown != nil {
shutdown(ctx)
}
return nil
}
Loading