Skip to content

Commit

Permalink
fix #1 cpu usage
Browse files Browse the repository at this point in the history
Signed-off-by: Jeeva Kandasamy <jkandasa@gmail.com>
  • Loading branch information
jkandasa committed Jul 16, 2021
1 parent a855bc8 commit b0f46c1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
26 changes: 20 additions & 6 deletions pkg/service/adapter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
DefaultReconnectDelay = "30s"

MqttDeviceName = "mqtt"

defaultMessageProcessTick = 1 * time.Second
)

// Service component of the provider
Expand Down Expand Up @@ -101,43 +103,55 @@ func (s *Service) Stop() {
}

func (s *Service) mqttMessageProcessor() {
ticker := time.NewTicker(10 * time.Microsecond)
ticker := time.NewTicker(defaultMessageProcessTick)
defer ticker.Stop()

for {
select {
case <-s.terminateMqttChan.CH:
return
case <-ticker.C:
if s.statusMqtt.Status == model.StatusUP {
if message := s.mqttMessageQueue.Get(); message != nil {
for {
if s.statusMqtt.Status == model.StatusUP {
message := s.mqttMessageQueue.Get()
if message == nil {
break
}
message.Others.Set(model.KeyMqttQoS, int(s.adapterConfig.MQTT.GetInt64(model.KeyMqttQoS)), nil)
err := s.mqttDevice.Write(message)
if err != nil {
zap.L().Error("error on writing a message to mqtt", zap.Error(err), zap.String("adapterName", s.adapterConfig.Name))
}
} else {
break
}
}
}
}
}

func (s *Service) sourceMessageProcessor() {
ticker := time.NewTicker(10 * time.Microsecond)
ticker := time.NewTicker(defaultMessageProcessTick)
defer ticker.Stop()

for {
select {
case <-s.terminateSourceChan.CH:
return
case <-ticker.C:
if s.statusSource.Status == model.StatusUP {
if message := s.sourceMessageQueue.Get(); message != nil {
for {
if s.statusSource.Status == model.StatusUP {
message := s.sourceMessageQueue.Get()
if message == nil {
break
}
zap.L().Debug("posting a message to source device", zap.String("message", message.ToString()))
err := s.sourceDevice.Write(message)
if err != nil {
zap.L().Error("error on writing a message to source device", zap.Error(err), zap.String("adapterName", s.adapterConfig.Name))
}
} else {
break
}
}
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/service/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
adapterSVC "github.com/mycontroller-org/2mqtt/pkg/service/adapter"
cfg "github.com/mycontroller-org/2mqtt/pkg/service/configuration"
sch "github.com/mycontroller-org/backend/v2/pkg/service/core_scheduler"
"github.com/pkg/profile"
"go.uber.org/zap"
)

Expand All @@ -34,15 +33,10 @@ func handleShutdownSignal() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

// CPU profiling by default
stop := profile.Start(profile.NoShutdownHook, profile.ProfilePath("/tmp/2mqtt.pprof"))

// waiting for signal
sig := <-sigs
close(sigs)

stop.Stop()

zap.L().Info("shutdown initiated..", zap.Any("signal", sig))
triggerShutdown()
}
Expand Down

0 comments on commit b0f46c1

Please sign in to comment.