diff --git a/README.md b/README.md index 75fba88..8a03831 100644 --- a/README.md +++ b/README.md @@ -42,13 +42,7 @@ Folgende Leitlinien sind bei der Entwicklung des CCU-Jacks maßgebend: Nach der Implementierung von MQTT sind zukünftig erst einmal kleinere Erweiterungen geplant, um den CCU-Jack für die V1.0 abzurunden: -* Erweiterungen für MQTT - * Zugriffsberechtigungen -* Erweiterungen VEAP-API - * Zugriffsberechtigungen -* Erweiterungen der Web-UI - * Setzen von Datenpunkten im _Navigator_ und der _Überwachung_ - * Benutzer- und Rechteverwaltung +* Web-Socket-Unterstützung für MQTT Langfristig sind dann bereits folgende Erweiterungen geplant: * Erweiterungen für MQTT @@ -158,7 +152,7 @@ Mit dem [Kommandozeilenwerkzeug CURL](https://curl.haxx.se), das praktisch für ## Beschreibung der MQTT-Schnittstelle -Der CCU-Jack enthält einen vollwertigen und leistungsfähigen MQTT-Server (V3.1.1). Dieser kann von beliebigen Fremdapplikationen genutzt werden. Zudem werden die Wertänderungen aller Gerätedatenpunkte der CCU und ausgewählter Systemvariablen automatisch an den MQTT-Server gesendet und stehen daraufhin allen MQTT-Clients zur Verfügung. +Der CCU-Jack enthält einen vollwertigen und leistungsfähigen MQTT-Server (V3.1.1). Dieser kann von beliebigen Fremdapplikationen genutzt werden. Zudem werden die Wertänderungen aller Gerätedatenpunkte der CCU und ausgewählter Systemvariablen automatisch an den MQTT-Server gesendet und stehen daraufhin allen MQTT-Clients zur Verfügung. Die Netzwerk-Ports können mit den Optionen `MQTT.Port` und `MQTT.PortTLS` eingestellt werden. Ein Zugriff über Web-Sockets ist über den Pfad `/ws-mqtt` des HTTP(S)-Servers möglich. Um das MQTT-Protokoll hat sich ein großes Ökosystem gebildet. Eine Übersicht ist in dieser [Link-Sammlung](https://github.com/hobbyquaker/awesome-mqtt) zu finden. diff --git a/build/main.go b/build/main.go index b0f7f1c..a2f6104 100644 --- a/build/main.go +++ b/build/main.go @@ -12,7 +12,7 @@ import ( const ( logLevel = logging.InfoLevel appName = "ccu-jack" - appVersion = "0.10.1" + appVersion = "0.10.2" appPkg = "github.com/mdzio/ccu-jack" ldFlags = "-s -w -X main.appVersion=" + appVersion buildDir = ".." @@ -23,7 +23,7 @@ var ( targetSystems = []string{ "ccu2", "rm-rp0+1", - "ccu3-rm-rp2+3", + "ccu3-rm-rp2+3+4", "vccu-x86", "win", "linux", @@ -35,13 +35,13 @@ var ( addon bool goSpec releng.GoSpec }{ - "ccu2": {true, releng.GoSpec{OS: "linux", Arch: "arm", Arm: "5", LDFlags: ldFlags}}, - "rm-rp0+1": {true, releng.GoSpec{OS: "linux", Arch: "arm", Arm: "6", LDFlags: ldFlags}}, - "ccu3-rm-rp2+3": {true, releng.GoSpec{OS: "linux", Arch: "arm", Arm: "7", LDFlags: ldFlags}}, - "vccu-x86": {true, releng.GoSpec{OS: "linux", Arch: "386", LDFlags: ldFlags}}, - "win": {false, releng.GoSpec{OS: "windows", Arch: "amd64", LDFlags: ldFlags}}, - "linux": {false, releng.GoSpec{OS: "linux", Arch: "amd64", LDFlags: ldFlags}}, - "darwin": {false, releng.GoSpec{OS: "darwin", Arch: "amd64", LDFlags: ldFlags}}, + "ccu2": {true, releng.GoSpec{OS: "linux", Arch: "arm", Arm: "5", LDFlags: ldFlags}}, + "rm-rp0+1": {true, releng.GoSpec{OS: "linux", Arch: "arm", Arm: "6", LDFlags: ldFlags}}, + "ccu3-rm-rp2+3+4": {true, releng.GoSpec{OS: "linux", Arch: "arm", Arm: "7", LDFlags: ldFlags}}, + "vccu-x86": {true, releng.GoSpec{OS: "linux", Arch: "386", LDFlags: ldFlags}}, + "win": {false, releng.GoSpec{OS: "windows", Arch: "amd64", LDFlags: ldFlags}}, + "linux": {false, releng.GoSpec{OS: "linux", Arch: "amd64", LDFlags: ldFlags}}, + "darwin": {false, releng.GoSpec{OS: "darwin", Arch: "amd64", LDFlags: ldFlags}}, } // files for non ccu target systems diff --git a/main.go b/main.go index 5532929..2892fc0 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "time" "github.com/gorilla/handlers" - "github.com/mdzio/ccu-jack/mqtt" "github.com/mdzio/ccu-jack/rtcfg" "github.com/mdzio/ccu-jack/vmodel" @@ -20,6 +19,7 @@ import ( "github.com/mdzio/go-lib/httputil" "github.com/mdzio/go-logging" "github.com/mdzio/go-mqtt/auth" + "github.com/mdzio/go-mqtt/service" "github.com/mdzio/go-veap" "github.com/mdzio/go-veap/model" ) @@ -28,7 +28,7 @@ const ( appDisplayName = "CCU-Jack" appName = "ccu-jack" appDescription = "REST/MQTT-Server for the HomeMatic CCU" - appCopyright = "(C)2020" + appCopyright = "(C)2020-2021" appVendor = "info@ccu-historian.de" webUIDir = "webui" @@ -37,16 +37,23 @@ const ( caKeyFile = "cacert.key" serverCertFile = "svrcert.pem" serverKeyFile = "svrcert.key" + + // MQTT websocket path + mqttWsPath = "/ws-mqtt" ) var ( appVersion = "-dev-" // overwritten during build process - log = logging.Get("main") - logFile *os.File - store = rtcfg.Store{FileName: configFile} + // base services + log = logging.Get("main") + logFile *os.File + store = rtcfg.Store{FileName: configFile} + httpServer *httputil.Server + modelRoot *model.Root + modelService *model.Service - httpServer *httputil.Server + // application services sysVarCol *vmodel.SysVarCol prgCol *vmodel.ProgramCol mqttServer *mqtt.Broker @@ -64,21 +71,23 @@ func configure() error { return err } - // configuration may be updated - return store.View(func(cfg *rtcfg.Config) error { - // set log options - logging.SetLevel(cfg.Logging.Level) - if cfg.Logging.FilePath != "" { - var err error - logFile, err = os.OpenFile(cfg.Logging.FilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return fmt.Errorf("Opening log file failed: %w", err) - } - // switch to file log - logging.SetWriter(logFile) + // lock config for reading + store.RLock() + defer store.RUnlock() + logCfg := &store.Config.Logging + + // configure logging + logging.SetLevel(logCfg.Level) + if logCfg.FilePath != "" { + var err error + logFile, err = os.OpenFile(logCfg.FilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("Opening log file failed: %w", err) } - return nil - }) + // switch to file log + logging.SetWriter(logFile) + } + return nil } func message() { @@ -86,23 +95,25 @@ func message() { log.Info(appDisplayName, " V", appVersion) log.Info(appCopyright, " ", appVendor) + // lock config for reading + store.RLock() + defer store.RUnlock() + cfg := store.Config + // log configuration - store.View(func(cfg *rtcfg.Config) error { - log.Info("Configuration:") - log.Info(" Log level: ", cfg.Logging.Level.String()) - log.Info(" Log file: ", cfg.Logging.FilePath) - log.Info(" Server host name: ", cfg.Host.Name) - log.Info(" Server address: ", cfg.Host.Address) - log.Info(" HTTP port: ", cfg.HTTP.Port) - log.Info(" HTTPS port: ", cfg.HTTP.PortTLS) - log.Info(" CORS origins: ", strings.Join(cfg.HTTP.CORSOrigins, ",")) - log.Info(" MQTT port: ", cfg.MQTT.Port) - log.Info(" Secure MQTT port: ", cfg.MQTT.PortTLS) - log.Info(" CCU address: ", cfg.CCU.Address) - log.Info(" Interfaces: ", cfg.CCU.Interfaces.String()) - log.Info(" Init ID: ", cfg.CCU.InitID) - return nil - }) + log.Info("Configuration:") + log.Info(" Log level: ", cfg.Logging.Level.String()) + log.Info(" Log file: ", cfg.Logging.FilePath) + log.Info(" Server host name: ", cfg.Host.Name) + log.Info(" Server address: ", cfg.Host.Address) + log.Info(" HTTP port: ", cfg.HTTP.Port) + log.Info(" HTTPS port: ", cfg.HTTP.PortTLS) + log.Info(" CORS origins: ", strings.Join(cfg.HTTP.CORSOrigins, ",")) + log.Info(" MQTT port: ", cfg.MQTT.Port) + log.Info(" Secure MQTT port: ", cfg.MQTT.PortTLS) + log.Info(" CCU address: ", cfg.CCU.Address) + log.Info(" Interfaces: ", cfg.CCU.Interfaces.String()) + log.Info(" Init ID: ", cfg.CCU.InitID) } func certificates() error { @@ -113,26 +124,29 @@ func certificates() error { return nil } + // lock config for reading + store.RLock() + defer store.RUnlock() + cfg := store.Config + // generate certificates - return store.View(func(cfg *rtcfg.Config) error { - log.Info("Generating certificates") - now := time.Now() - gen := &httputil.CertGenerator{ - Hosts: []string{cfg.Host.Name}, - Organization: appDisplayName, - NotBefore: now, - NotAfter: now.Add(10 * 365 * 24 * time.Hour), - CACertFile: caCertFile, - CAKeyFile: caKeyFile, - ServerCertFile: serverCertFile, - ServerKeyFile: serverKeyFile, - } - if err := gen.Generate(); err != nil { - return err - } - log.Debugf("Created certificate files: %s, %s, %s, %s", caCertFile, caKeyFile, serverCertFile, serverKeyFile) - return nil - }) + log.Info("Generating certificates") + now := time.Now() + gen := &httputil.CertGenerator{ + Hosts: []string{cfg.Host.Name}, + Organization: appDisplayName, + NotBefore: now, + NotAfter: now.Add(10 * 365 * 24 * time.Hour), + CACertFile: caCertFile, + CAKeyFile: caKeyFile, + ServerCertFile: serverCertFile, + ServerKeyFile: serverKeyFile, + } + if err := gen.Generate(); err != nil { + return err + } + log.Debugf("Created certificate files: %s, %s, %s, %s", caCertFile, caKeyFile, serverCertFile, serverKeyFile) + return nil } func newRoot(handlerStats *veap.HandlerStats) *model.Root { @@ -156,134 +170,148 @@ func newRoot(handlerStats *veap.HandlerStats) *model.Root { return r } -func startup(serveErr chan<- error) { - // read config - store.View(func(cfg *rtcfg.Config) error { - // file handler for static files - http.Handle("/ui/", http.StripPrefix("/ui", http.FileServer(http.Dir(webUIDir)))) - - // setup and start http(s) server - httpServer = &httputil.Server{ - Addr: ":" + strconv.Itoa(cfg.HTTP.Port), - AddrTLS: ":" + strconv.Itoa(cfg.HTTP.PortTLS), - CertFile: serverCertFile, - KeyFile: serverKeyFile, - ServeErr: serveErr, - } - httpServer.Startup() +func startupBase(serveErr chan<- error) { + // lock config for reading + store.RLock() + defer store.RUnlock() + cfg := store.Config + + // file handler for static files + http.Handle("/ui/", http.StripPrefix("/ui", http.FileServer(http.Dir(webUIDir)))) + + // setup and start http(s) server + httpServer = &httputil.Server{ + Addr: ":" + strconv.Itoa(cfg.HTTP.Port), + AddrTLS: ":" + strconv.Itoa(cfg.HTTP.PortTLS), + CertFile: serverCertFile, + KeyFile: serverKeyFile, + ServeErr: serveErr, + } + httpServer.Startup() + + // veap handler and model + veapHandler := &veap.Handler{} + modelRoot = newRoot(&veapHandler.Stats) + modelService = &model.Service{Root: modelRoot} + veapHandler.Service = modelService + + // authentication for VEAP + var handler http.Handler + handler = &HTTPAuthHandler{ + Handler: veapHandler, + Store: &store, + Realm: "CCU-Jack VEAP-Server", + } - // veap handler and model - veapHandler := &veap.Handler{} - root := newRoot(&veapHandler.Stats) - modelService := &model.Service{Root: root} - veapHandler.Service = modelService + // CORS handler for VEAP + allowedMethods := handlers.AllowedMethods([]string{http.MethodGet, http.MethodPut}) + if len(cfg.HTTP.CORSOrigins) == 0 { + handler = handlers.CORS(allowedMethods)(handler) + } else { + allowedOrigins := handlers.AllowedOrigins(cfg.HTTP.CORSOrigins) + // only if origin is specified, credentials are allowed (CORS spec) + allowCredentials := handlers.AllowCredentials() + handler = handlers.CORS(allowedMethods, allowedOrigins, allowCredentials)(handler) + } - // create device collection - deviceCol = vmodel.NewDeviceCol(root) + // register VEAP handler + http.Handle(veapHandler.URLPrefix+"/", handler) +} - // configure HM script client - scriptClient := &script.Client{ - Addr: cfg.CCU.Address, - } +func shutdownBase() { + httpServer.Shutdown() +} - // create system variable collection - sysVarCol = vmodel.NewSysVarCol(root) - sysVarCol.ScriptClient = scriptClient - sysVarCol.Start() - - // create programs collection - prgCol = vmodel.NewProgramCol(root) - prgCol.ScriptClient = scriptClient - prgCol.Start() - - // MQTT authentication handler - mqttAuth := "configAuthHandler" - auth.Register(mqttAuth, &mqtt.AuthHandler{Store: &store}) - - // setup and start MQTT server - mqttServer = &mqtt.Broker{ - Addr: "tcp://:" + strconv.Itoa(cfg.MQTT.Port), - AddrTLS: "tcp://:" + strconv.Itoa(cfg.MQTT.PortTLS), - CertFile: serverCertFile, - KeyFile: serverKeyFile, - Authenticator: mqttAuth, - ServeErr: serveErr, - Service: modelService, - } - mqttServer.Start() +func startupApp(serveErr chan<- error) { + // lock config for reading + store.RLock() + defer store.RUnlock() + cfg := store.Config - // event receiver for MQTT - mqttReceiver := &mqtt.EventReceiver{ - Broker: mqttServer, - // forward events - Next: deviceCol, - } + // create device collection + deviceCol = vmodel.NewDeviceCol(modelRoot) - // configure interconnector - intercon = &itf.Interconnector{ - CCUAddr: cfg.CCU.Address, - Types: cfg.CCU.Interfaces, - IDPrefix: cfg.CCU.InitID + "-", - Receiver: mqttReceiver, - // full URL of the DefaultServeMux for callbacks - ServerURL: "http://" + cfg.Host.Address + ":" + strconv.Itoa(cfg.HTTP.Port), - } + // configure HM script client + scriptClient := &script.Client{ + Addr: cfg.CCU.Address, + } - // start ReGa DOM explorer - reGaDOM = script.NewReGaDOM(scriptClient) - reGaDOM.Start() - - // create room and function collections - vmodel.NewRoomCol(root, reGaDOM, modelService) - vmodel.NewFunctionCol(root, reGaDOM, modelService) - - // startup device domain (starts handling of events) - deviceCol.Interconnector = intercon - deviceCol.ReGaDOM = reGaDOM - deviceCol.ModelService = modelService - deviceCol.Start() - - // startup interconnector - // (an additional handler for XMLRPC is registered at the DefaultServeMux.) - intercon.Start() - - // authentication for VEAP - var handler http.Handler - handler = &HTTPAuthHandler{ - Handler: veapHandler, - Store: &store, - Realm: "CCU-Jack VEAP-Server", - } + // create system variable collection + sysVarCol = vmodel.NewSysVarCol(modelRoot) + sysVarCol.ScriptClient = scriptClient + sysVarCol.Start() + + // create programs collection + prgCol = vmodel.NewProgramCol(modelRoot) + prgCol.ScriptClient = scriptClient + prgCol.Start() + + // MQTT authentication handler + mqttAuth := "configAuthHandler" + auth.Register(mqttAuth, &mqtt.AuthHandler{Store: &store}) + + // setup and start MQTT server + mqttServer = &mqtt.Broker{ + Addr: "tcp://:" + strconv.Itoa(cfg.MQTT.Port), + AddrTLS: "tcp://:" + strconv.Itoa(cfg.MQTT.PortTLS), + CertFile: serverCertFile, + KeyFile: serverKeyFile, + Authenticator: mqttAuth, + ServeErr: serveErr, + Service: modelService, + } + mqttServer.Start() - // CORS handler for VEAP - allowedMethods := handlers.AllowedMethods([]string{http.MethodGet, http.MethodPut}) - if len(cfg.HTTP.CORSOrigins) == 0 { - handler = handlers.CORS(allowedMethods)(handler) - } else { - allowedOrigins := handlers.AllowedOrigins(cfg.HTTP.CORSOrigins) - // only if origin is specified, credentials are allowed (CORS spec) - allowCredentials := handlers.AllowCredentials() - handler = handlers.CORS(allowedMethods, allowedOrigins, allowCredentials)(handler) - } + // register websocket proxy for MQTT + log.Infof("MQTT websocket path: " + mqttWsPath) + mqttWs := &service.WebsocketHandler{ + Addr: ":" + strconv.Itoa(cfg.MQTT.Port), + } + http.Handle(mqttWsPath, mqttWs) - // register VEAP handler - http.Handle(veapHandler.URLPrefix+"/", handler) - return nil - }) + // event receiver for MQTT + mqttReceiver := &mqtt.EventReceiver{ + Broker: mqttServer, + // forward events + Next: deviceCol, + } - // wait for start up to complete (do not call Close() on the servers before - // the start up is finished) - time.Sleep(1 * time.Second) + // configure interconnector + intercon = &itf.Interconnector{ + CCUAddr: cfg.CCU.Address, + Types: cfg.CCU.Interfaces, + IDPrefix: cfg.CCU.InitID + "-", + Receiver: mqttReceiver, + // full URL of the DefaultServeMux for callbacks + ServerURL: "http://" + cfg.Host.Address + ":" + strconv.Itoa(cfg.HTTP.Port), + } + + // start ReGa DOM explorer + reGaDOM = script.NewReGaDOM(scriptClient) + reGaDOM.Start() + + // create room and function collections + vmodel.NewRoomCol(modelRoot, reGaDOM, modelService) + vmodel.NewFunctionCol(modelRoot, reGaDOM, modelService) + + // startup device domain (starts handling of events) + deviceCol.Interconnector = intercon + deviceCol.ReGaDOM = reGaDOM + deviceCol.ModelService = modelService + deviceCol.Start() + + // startup interconnector + // (an additional handler for XMLRPC is registered at the DefaultServeMux.) + intercon.Start() } -func shutdown() { +func shutdownApp() { intercon.Stop() deviceCol.Stop() reGaDOM.Stop() mqttServer.Stop() prgCol.Stop() sysVarCol.Stop() - httpServer.Shutdown() } func run() error { @@ -318,9 +346,17 @@ func run() error { // react on fatal serve errors serveErr := make(chan error) - // startup components - startup(serveErr) - defer shutdown() + // startup base services + startupBase(serveErr) + defer shutdownBase() + + // startup application services + startupApp(serveErr) + defer shutdownApp() + + // wait for start up to complete (do not call Close() on the servers before + // the start up is finished) + time.Sleep(1 * time.Second) // wait for shutdown or error select {