diff --git a/pkg/mqtt/connection.go b/pkg/mqtt/connection.go index 5ab2211..3944ce9 100644 --- a/pkg/mqtt/connection.go +++ b/pkg/mqtt/connection.go @@ -135,6 +135,7 @@ func (c *Connection) RcvMsgFromClient() { } server.Log.Infof("%s, come msg===\n%v\n=====", host, msg) + c.LastHbTime = time.Now().Unix() switch msg := msg.(type) { case *Connect: ret := RetCodeAccepted @@ -246,7 +247,6 @@ func (c *Connection) RcvMsgFromClient() { case *PingReq: server.Log.Infof("%s, ping req comes", host) - c.LastHbTime = time.Now().Unix() pingrsp := &PingResp{} err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId) if err != nil { diff --git a/pkg/mqtt/manager.go b/pkg/mqtt/manager.go index c3d8777..73f54be 100644 --- a/pkg/mqtt/manager.go +++ b/pkg/mqtt/manager.go @@ -1,6 +1,7 @@ package mqtt import ( + "github.com/PandoCloud/pando-cloud/pkg/server" "net" "sync" "time" @@ -81,6 +82,7 @@ func (m *Manager) PublishMessage2Server(deviceid uint64, msg *Publish) error { func (m *Manager) CleanWorker() { for { + server.Log.Infoln("scanning and removing inactive connections...") curTime := time.Now().Unix() for _, con := range m.IdToConn { @@ -88,7 +90,8 @@ func (m *Manager) CleanWorker() { continue } - if uint16(curTime-con.LastHbTime) > uint16(2*con.KeepAlive/2) { + if uint16(curTime-con.LastHbTime) > uint16(3*con.KeepAlive/2) { + server.Log.Infof("connection %v inactive , removing", con) con.Close() } }