Skip to content

Commit

Permalink
edgehub dispatch message by type
Browse files Browse the repository at this point in the history
  • Loading branch information
lvchenggang committed Sep 2, 2020
1 parent 4ad7727 commit 935121a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
5 changes: 5 additions & 0 deletions edge/pkg/common/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ const (
ResourceTypeNodeConnection = "node/connection"
OperationNodeConnection = "publish"
SourceNodeConnection = "edgehub"

ResourceGroupName = "resource"
TwinGroupName = "twin"
FuncGroupName = "func"
UserGroupName = "user"
)

//BuildMsg returns message object with router and content details
Expand Down
25 changes: 17 additions & 8 deletions edge/pkg/edgehub/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
connect "github.com/kubeedge/kubeedge/edge/pkg/common/cloudconnection"
"github.com/kubeedge/kubeedge/edge/pkg/common/message"
messagepkg "github.com/kubeedge/kubeedge/edge/pkg/common/message"
"github.com/kubeedge/kubeedge/edge/pkg/common/modules"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/clients"
"github.com/kubeedge/kubeedge/edge/pkg/edgehub/config"
Expand Down Expand Up @@ -81,11 +81,20 @@ func (eh *EdgeHub) sendToKeepChannel(message model.Message) error {
}

func (eh *EdgeHub) dispatch(message model.Message) error {
// TODO: dispatch message by the message type
md, ok := groupMap[message.GetGroup()]
if !ok {
klog.Warningf("msg_group not found")
return fmt.Errorf("msg_group not found")
group := message.GetGroup()
md := ""
switch group {
case messagepkg.ResourceGroupName:
md = modules.MetaGroup
case messagepkg.TwinGroupName:
md = modules.TwinGroup
case messagepkg.FuncGroupName:
md = modules.MetaGroup
case messagepkg.UserGroupName:
md = modules.BusGroup
default:
klog.Warningf("msg_group: %s not found", group)
return fmt.Errorf("msg_group: %s not found", group)
}

isResponse := eh.isSyncResponse(message.GetParentID())
Expand Down Expand Up @@ -206,8 +215,8 @@ func (eh *EdgeHub) pubConnectInfo(isConnected bool) {
}

for _, group := range groupMap {
message := model.NewMessage("").BuildRouter(message.SourceNodeConnection, group,
message.ResourceTypeNodeConnection, message.OperationNodeConnection).FillBody(content)
message := model.NewMessage("").BuildRouter(messagepkg.SourceNodeConnection, group,
messagepkg.ResourceTypeNodeConnection, messagepkg.OperationNodeConnection).FillBody(content)
beehiveContext.SendToGroup(group, *message)
}
}
Expand Down

0 comments on commit 935121a

Please sign in to comment.