diff --git a/pkg/service/adapter/service.go b/pkg/service/adapter/service.go index 8c0a564..34cf8f8 100644 --- a/pkg/service/adapter/service.go +++ b/pkg/service/adapter/service.go @@ -22,6 +22,8 @@ const ( DefaultReconnectDelay = "30s" MqttDeviceName = "mqtt" + + defaultMessageProcessTick = 1 * time.Second ) // Service component of the provider @@ -101,7 +103,7 @@ func (s *Service) Stop() { } func (s *Service) mqttMessageProcessor() { - ticker := time.NewTicker(10 * time.Microsecond) + ticker := time.NewTicker(defaultMessageProcessTick) defer ticker.Stop() for { @@ -109,13 +111,19 @@ func (s *Service) mqttMessageProcessor() { case <-s.terminateMqttChan.CH: return case <-ticker.C: - if s.statusMqtt.Status == model.StatusUP { - if message := s.mqttMessageQueue.Get(); message != nil { + for { + if s.statusMqtt.Status == model.StatusUP { + message := s.mqttMessageQueue.Get() + if message == nil { + break + } message.Others.Set(model.KeyMqttQoS, int(s.adapterConfig.MQTT.GetInt64(model.KeyMqttQoS)), nil) err := s.mqttDevice.Write(message) if err != nil { zap.L().Error("error on writing a message to mqtt", zap.Error(err), zap.String("adapterName", s.adapterConfig.Name)) } + } else { + break } } } @@ -123,7 +131,7 @@ func (s *Service) mqttMessageProcessor() { } func (s *Service) sourceMessageProcessor() { - ticker := time.NewTicker(10 * time.Microsecond) + ticker := time.NewTicker(defaultMessageProcessTick) defer ticker.Stop() for { @@ -131,13 +139,19 @@ func (s *Service) sourceMessageProcessor() { case <-s.terminateSourceChan.CH: return case <-ticker.C: - if s.statusSource.Status == model.StatusUP { - if message := s.sourceMessageQueue.Get(); message != nil { + for { + if s.statusSource.Status == model.StatusUP { + message := s.sourceMessageQueue.Get() + if message == nil { + break + } zap.L().Debug("posting a message to source device", zap.String("message", message.ToString())) err := s.sourceDevice.Write(message) if err != nil { zap.L().Error("error on writing a message to source device", zap.Error(err), zap.String("adapterName", s.adapterConfig.Name)) } + } else { + break } } } diff --git a/pkg/service/start/start.go b/pkg/service/start/start.go index 7cbf091..acf4de9 100644 --- a/pkg/service/start/start.go +++ b/pkg/service/start/start.go @@ -9,7 +9,6 @@ import ( adapterSVC "github.com/mycontroller-org/2mqtt/pkg/service/adapter" cfg "github.com/mycontroller-org/2mqtt/pkg/service/configuration" sch "github.com/mycontroller-org/backend/v2/pkg/service/core_scheduler" - "github.com/pkg/profile" "go.uber.org/zap" ) @@ -34,15 +33,10 @@ func handleShutdownSignal() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - // CPU profiling by default - stop := profile.Start(profile.NoShutdownHook, profile.ProfilePath("/tmp/2mqtt.pprof")) - // waiting for signal sig := <-sigs close(sigs) - stop.Stop() - zap.L().Info("shutdown initiated..", zap.Any("signal", sig)) triggerShutdown() }