diff --git a/.gitignore b/.gitignore index a798c63..ad7fe32 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,5 @@ station/station otto.log debug* *.code-workspace -*.log \ No newline at end of file +*.log +coverage.* \ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..68f7df6 --- /dev/null +++ b/config.go @@ -0,0 +1,7 @@ +package otto + +import "github.com/rustyeddy/otto/messanger" + +type Config struct { + messanger.Config +} diff --git a/data/data.go b/data/data.go index 26ea51d..37a3e42 100644 --- a/data/data.go +++ b/data/data.go @@ -16,8 +16,8 @@ type Data interface { // Data is an array of timestamps and values representing the same // source of data over a period of time type DataPoint struct { - value any `json:"value"` - timestamp time.Time `json:"time-increment"` + value any + timestamp time.Time } func NewData(dat any, ts time.Time) Data { diff --git a/messanger/messanger.go b/messanger/messanger.go index 11e0507..718e7d5 100644 --- a/messanger/messanger.go +++ b/messanger/messanger.go @@ -34,25 +34,26 @@ import ( "fmt" "log/slog" "net/http" + "os" + "strings" "sync" ) -var ( - // messanger holds the singleton instance of the active Messanger - messanger Messanger - messangerLock sync.Mutex -) - // MessangerConfig holds configuration parameters for messanger initialization. // Currently supports broker address configuration for MQTT-based messangers. -type MessangerConfig struct { +type Config struct { // Broker is the address of the MQTT broker (hostname or IP) - Broker string + Broker string `json:"broker"` + Username string `json:"username"` + Password string `json:"password"` } -var messangerConfig = MessangerConfig{ - Broker: "localhost", -} +var ( + // messanger holds the singleton instance of the active Messanger + msgr Messanger + messangerLock sync.Mutex + config Config +) // MsgHandler is a callback function type for handling incoming messages. // Subscribers provide a MsgHandler function that will be invoked when @@ -112,9 +113,9 @@ type Messanger interface { // // Supported ID values: // - "none": Creates a local in-process messanger without MQTT -// - "mqtt": Creates an MQTT messanger connecting to an external broker // - "local": Starts an embedded MQTT broker and creates an MQTT messanger -// +// - default: Creates an MQTT messanger connecting to an external broker + // The created messanger becomes the global singleton accessible via GetMessanger(). // If an invalid ID is provided, logs an error and returns nil. // @@ -124,26 +125,24 @@ type Messanger interface { // if msg == nil { // log.Fatal("Failed to create messanger") // } -func NewMessanger(id string) (m Messanger) { - switch id { +func NewMessanger(broker string) (m Messanger) { + + switch broker { case "none": - m = NewMessangerLocal(id) - case "mqtt": - m = NewMessangerMQTT(id, messangerConfig.Broker) - case "local": + msgr = NewMessangerLocal(broker) + + case "otto": _, err := StartMQTTBroker(context.Background()) if err != nil { slog.Error("Failed to start embedded MQTT broker", "error", err) return nil } - m = NewMessangerMQTT(id, messangerConfig.Broker) + msgr = NewMessangerMQTT(broker, broker) default: - slog.Error("Unknown messanger ID", "id", id) - return nil + msgr = NewMessangerMQTT(broker, broker) } - messanger = m - return m + return msgr } // GetMessanger returns the singleton Messanger instance created by NewMessanger. @@ -159,7 +158,60 @@ func NewMessanger(id string) (m Messanger) { func GetMessanger() Messanger { messangerLock.Lock() defer messangerLock.Unlock() - return messanger + + if msgr != nil { + return msgr + } + + // Take broker from the config first + broker := config.Broker + if broker == "" { + // if no config look for environment variable + broker = os.Getenv("MQTT_BROKER") + } + if broker == "" { + // if no environment variable then default to the built in + // broker + broker = "otto" + } + + user := config.Username + if user == "" { + user = os.Getenv("MQTT_USERNAME") + } + pass := config.Password + if pass == "" { + pass = os.Getenv("MQTT_PASSWORD") + } + + switch broker { + case "none": + msgr = NewMessangerLocal(broker) + + case "otto": + var err error + shutdown, err = StartMQTTBroker(context.Background()) + if err != nil { + slog.Error("Failed to start embedded MQTT broker", "error", err) + + // A hack if bind address is in use, skip out and just + // use the client to bind to the already running broker + if !strings.Contains(err.Error(), "bind: address already in use") { + return nil + } + + slog.Info("Assuming broker is already running, connecting to existing broker") + } + fallthrough + + default: + msgr = NewMessangerMQTTWithAuth("otto", broker, user, pass) + } + + ms := GetMsgSaver() + ms.Saving = true + + return msgr } // MessangerBase provides a base implementation of the Messanger interface @@ -246,6 +298,17 @@ func (mb *MessangerBase) PubMsg(msg *Msg) error { return nil } +func (mb *MessangerBase) Pub(topic string, data any) error { + + b, err := Bytes(data) + if err != nil { + slog.Error("messanger failed to convert bytes", "error", err) + return err + } + msg := NewMsg(topic, b, "otto") + return mb.PubMsg(msg) +} + // Close cleanly shuts down the messanger. // This base implementation is a no-op that just logs the close operation. // diff --git a/messanger/messanger_local.go b/messanger/messanger_local.go index c6e0b01..7ed8258 100644 --- a/messanger/messanger_local.go +++ b/messanger/messanger_local.go @@ -182,31 +182,3 @@ func (m *MessangerLocal) Close() { } slog.Debug("MessangerLocal.Close", "id", m.ID()) } - -// toBytesUnchecked is an internal helper that converts common types to []byte. -// It attempts to use the package's Bytes() function first, falling back to -// basic type assertions if that fails. -// -// This is a best-effort conversion used internally by the local messanger. -// -// Supported types: []byte, string, and any other type (converted via fmt.Sprintf) -// -// Parameters: -// - v: The value to convert to bytes -// -// Returns the byte representation of the value. -func toBytesUnchecked(v any) []byte { - // try using Bytes helper if available in this package - if b, err := Bytes(v); err == nil { - return b - } - // best-effort fallback - switch x := v.(type) { - case []byte: - return x - case string: - return []byte(x) - default: - return []byte(fmt.Sprintf("%v", v)) - } -} diff --git a/messanger/messanger_mqtt.go b/messanger/messanger_mqtt.go index 9af5220..de17912 100644 --- a/messanger/messanger_mqtt.go +++ b/messanger/messanger_mqtt.go @@ -261,6 +261,7 @@ type MsgPrinter struct{} // printer := &MsgPrinter{} // msg.Subscribe("ss/c/+/temp", printer.MsgHandler) func (m *MsgPrinter) MsgHandler(msg *Msg) error { - fmt.Printf("%+v\n", msg) + str := fmt.Sprintf("%#v", msg) + slog.Info("builtin MsgHandler", "msg", str) return nil } diff --git a/messanger/messanger_test.go b/messanger/messanger_test.go index 2a67f2b..1c5c8cf 100644 --- a/messanger/messanger_test.go +++ b/messanger/messanger_test.go @@ -48,11 +48,10 @@ func TestMessangerBaseServeHTTP(t *testing.T) { func TestNewMessanger(t *testing.T) { tests := []struct { - name string - id string - topic string - expected string - expectNil bool + name string + id string + topic string + expected string }{ { name: "Create none messanger", @@ -67,22 +66,18 @@ func TestNewMessanger(t *testing.T) { expected: "mqtt", }, { - name: "Create messanger with unknown id returns nil", - id: "unknown", - topic: "topic1", - expectNil: true, + name: "Create messanger with unknown id returns nil", + id: "unknown", + topic: "topic1", + expected: "unknown", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := NewMessanger(tt.id) - if tt.expectNil { - assert.Nil(t, m) - } else { - assert.NotNil(t, m) - assert.Equal(t, tt.expected, m.ID()) - } + assert.NotNil(t, m) + assert.Equal(t, tt.expected, m.ID()) }) } } diff --git a/messanger/mqtt.go b/messanger/mqtt.go index ac81698..83579ed 100644 --- a/messanger/mqtt.go +++ b/messanger/mqtt.go @@ -41,14 +41,14 @@ var ( // } // client.Publish("sensors/temp", "25.5") type MQTT struct { - id string `json:"id"` // Unique client identifier - Broker string `json:"broker"` // Broker hostname or IP (no protocol/port) - Username string `json:"username"` // MQTT authentication username - Password string `json:"password"` // MQTT authentication password - Debug bool `json:"debug"` // Enable Paho MQTT client debug logging + id string // Unique client identifier + Broker string // Broker hostname or IP (no protocol/port) + Username string // MQTT authentication username + Password string // MQTT authentication password + Debug bool // Enable Paho MQTT client debug logging - error `json:"error"` // Last error encountered - gomqtt.Client `json:"-"` // Embedded Paho MQTT client + error // Last error encountered + gomqtt.Client // Embedded Paho MQTT client } // NewMQTT creates a new MQTT client instance with default credentials. @@ -69,8 +69,8 @@ func NewMQTT(id string, broker string, topics string) *MQTT { mqtt = &MQTT{ id: id, Broker: broker, - Username: "otto", - Password: "otto123", + Username: "", + Password: "", } return mqtt } diff --git a/messanger/mqtt_broker.go b/messanger/mqtt_broker.go index 9970fb1..5de224b 100644 --- a/messanger/mqtt_broker.go +++ b/messanger/mqtt_broker.go @@ -3,6 +3,8 @@ package messanger import ( "context" "log" + "log/slog" + "sync" mqttserver "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/hooks/auth" @@ -52,6 +54,10 @@ import ( // Optional Features (commented out): // - WebSocket listener on port 1882 for browser clients // - HTTP health check endpoint on port 8081 for monitoring +var ( + shutdown func(context.Context) error +) + func StartMQTTBroker(ctx context.Context) (func(context.Context) error, error) { // Create broker with default options (in-memory state). srv := mqttserver.New(nil) @@ -107,15 +113,28 @@ func StartMQTTBroker(ctx context.Context) (func(context.Context) error, error) { } }() + var once sync.Once + + // Return a shutdown function to be called from the app. + shutdown = func(_ context.Context) error { + slog.Debug("mqtt broker shutdown called") + // srv.Close() must be wrapped because it returns an error + once.Do(func() { srv.Close() }) + return nil + } + // Close on context cancellation if provided. go func() { <-ctx.Done() - _ = srv.Close() + _ = shutdown(ctx) }() - // Return a shutdown function you can call from your app. - shutdown := func(_ context.Context) error { - return srv.Close() - } return shutdown, nil } + +func StopMQTTBroker(ctx context.Context) error { + if shutdown != nil { + shutdown(ctx) + } + return nil +} diff --git a/otto.go b/otto.go index 37aa822..cf23ec4 100644 --- a/otto.go +++ b/otto.go @@ -130,7 +130,6 @@ import ( "context" "fmt" "log/slog" - "os" "github.com/rustyeddy/otto/messanger" "github.com/rustyeddy/otto/server" @@ -156,12 +155,11 @@ type OttO struct { *server.Server messanger.Messanger - Mock bool - MQTTBroker string // MQTT broker URL, defaults to test.mosquitto.org - UseLocal bool // Force use of local messaging - hub bool // maybe hub should be a different struct? - done chan any - brokerShutdown func(context.Context) error // graceful shutdown for embedded broker + Mock bool + MQTTBroker string // MQTT broker URL, defaults to test.mosquitto.org + UseLocal bool // Force use of local messaging + hub bool // maybe hub should be a different struct? + done chan any } // global variables and structures @@ -207,41 +205,7 @@ func (o *OttO) Init() { } // Initialzie the local station o.Station.Init() - - // Start embedded MQTT broker (optional) and store shutdown func. - if true { - if shutdown, err := messanger.StartMQTTBroker(context.Background()); err != nil { - slog.Error("Failed to start embedded MQTT broker", "error", err) - } else { - o.brokerShutdown = shutdown - } - } - - if o.UseLocal { - slog.Info("Using local messaging (no MQTT)") - o.Messanger = messanger.NewMessangerLocal("otto") - } else { - - // Set the MQTT_BROKER environment variable for the messanger - o.MQTTBroker = os.Getenv("MQTT_BROKER") - if o.MQTTBroker == "" { - o.MQTTBroker = "localhost" - } - - // Get MQTT credentials from environment or use defaults for embedded broker - mqttUser := os.Getenv("MQTT_USER") - mqttPass := os.Getenv("MQTT_PASS") - if mqttUser == "" && o.MQTTBroker == "localhost" { - mqttUser = "otto" - mqttPass = "otto123" - } - - slog.Info("Attempting MQTT connection", "broker", o.MQTTBroker, "user", mqttUser) - o.Messanger = messanger.NewMessangerMQTTWithAuth("otto", o.MQTTBroker, mqttUser, mqttPass) - } - ms := messanger.GetMsgSaver() - ms.Saving = true - + o.Messanger = messanger.GetMessanger() } func (o *OttO) Start() { @@ -268,10 +232,7 @@ func (o *OttO) Stop() { if o.Messanger != nil { o.Messanger.Close() - } - - if o.brokerShutdown != nil { - _ = o.brokerShutdown(context.Background()) + messanger.StopMQTTBroker(context.Background()) } } @@ -290,8 +251,5 @@ func (o *OttO) GetManagedDevice(name string) *station.ManagedDevice { return nil } device := o.Station.Get(name) - if md, ok := device.(*station.ManagedDevice); ok { - return md - } - return nil + return device } diff --git a/otto_test.go b/otto_test.go index 3c216a6..58233ad 100644 --- a/otto_test.go +++ b/otto_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestOttOInit(t *testing.T) { @@ -17,8 +16,6 @@ func TestOttOInit(t *testing.T) { assert.NotNil(t, o.StationManager, "Expected StationManager to be initialized") assert.NotNil(t, o.Station, "Expected Station to be initialized") assert.NotNil(t, o.Server, "Expected Server to be initialized") - assert.NotNil(t, o.brokerShutdown, "Expected brokerShutdown function to be initialized") - // Clean up by stopping OttO (handle done channel in goroutine) go func() { <-o.Done() @@ -30,8 +27,6 @@ func TestOttOBrokerShutdown(t *testing.T) { o := &OttO{Name: "TestOttOShutdown"} o.Init() - require.NotNil(t, o.brokerShutdown, "Expected brokerShutdown function to be initialized") - // Test that calling Stop doesn't panic and properly shuts down the broker assert.NotPanics(t, func() { // Start a goroutine to receive from done channel diff --git a/server/server.go b/server/server.go index 8dbcc98..fab8560 100644 --- a/server/server.go +++ b/server/server.go @@ -14,9 +14,6 @@ import ( "github.com/rustyeddy/otto/messanger" ) -type OttOServer interface { -} - // Server serves up HTTP on Addr (default 0.0.0.0:8011) // It takes care of REST API, serving the web app if Appdir // does not equal nil and initial Websocket upgrade diff --git a/station/device_manager.go b/station/device_manager.go index fb81806..5c9beb2 100644 --- a/station/device_manager.go +++ b/station/device_manager.go @@ -11,7 +11,7 @@ var ( type DeviceManager struct { // Internal generic device store for tests and loose coupling - devices map[string]any + devices map[string]*ManagedDevice Metrics *DeviceMetrics mu sync.RWMutex `json:"-"` } @@ -26,7 +26,7 @@ type DeviceMetrics struct { func NewDeviceManager() *DeviceManager { return &DeviceManager{ - devices: make(map[string]any), + devices: make(map[string]*ManagedDevice), Metrics: &DeviceMetrics{}, } } @@ -78,10 +78,22 @@ func (dm *DeviceManager) Remove(id string) { } // GetDevice returns the device (anythig supporting the Name (Name()) interface) -func (dm *DeviceManager) Get(name string) any { +func (dm *DeviceManager) Get(name string) *ManagedDevice { dm.mu.RLock() defer dm.mu.RUnlock() - return dm.devices[name] + md, ok := dm.devices[name] + if !ok { + return nil + } + return md +} + +func (dm *DeviceManager) GetDevice(name string) any { + md := dm.Get(name) + if md == nil { + return nil + } + return md.Device } // UpdateDeviceMetrics updates device-related metrics diff --git a/station/station.go b/station/station.go index 5ebb934..c462520 100644 --- a/station/station.go +++ b/station/station.go @@ -28,7 +28,7 @@ type Station struct { messanger.Messanger `json:"-"` errq chan error - errors []error `json:"errors"` + errors []error `json:"-"` time.Duration `json:"duration"` ticker *time.Ticker `json:"-"` diff --git a/station/station_test.go b/station/station_test.go index 1ce7858..542a462 100644 --- a/station/station_test.go +++ b/station/station_test.go @@ -122,8 +122,8 @@ func TestStationTicker(t *testing.T) { // Verify announcements were sent metrics := station.Metrics.GetMetrics() - assert.Equal(t, uint64(3), metrics.AnnouncementsSent) - assert.True(t, metrics.AnnouncementsSent > 0, "Should have sent some announcements") + assert.True(t, metrics.AnnouncementsSent >= 3, "Should have sent some announcements") + assert.True(t, metrics.AnnouncementsSent <= 4, "Should have sent some announcements") } func TestStationHealthCheck(t *testing.T) {