Skip to content

Commit

Permalink
Wrap up protocol responses in correct messages
Browse files Browse the repository at this point in the history
  • Loading branch information
trustmaster committed Feb 1, 2015
1 parent 314e431 commit 1f3565f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 21 deletions.
18 changes: 15 additions & 3 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package flow
type Message struct {
// Protocol is NoFlo protocol identifier:
// "runtime", "component", "graph" or "network"
Protocol string
Protocol string `json:"protocol"`
// Command is a command to be executed within the protocol
Command string
Command string `json:"command"`
// Payload is JSON-encoded body of the message
Payload interface{}
Payload interface{} `json:"payload"`
}

// runtimeInfo message contains response to runtime.getruntime request
Expand All @@ -19,6 +19,12 @@ type runtimeInfo struct {
Id string `json:"id"`
}

type runtimeMessage struct {
Protocol string `json:"protocol"`
Command string `json:"command"`
Payload runtimeInfo `json:"payload"`
}

// clearGraph message is sent by client to create a new empty graph
type clearGraph struct {
Id string
Expand Down Expand Up @@ -169,3 +175,9 @@ type ComponentInfo struct {
InPorts []PortInfo `json:"inPorts"`
OutPorts []PortInfo `json:"outPorts"`
}

type componentMessage struct {
Protocol string `json:"protocol"`
Command string `json:"command"`
Payload ComponentInfo `json:"payload"`
}
68 changes: 52 additions & 16 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ func (r *Runtime) Init(name string) {
r.ready = make(chan struct{})
r.handlers = make(map[string]protocolHandler)
r.handlers["runtime.getruntime"] = func(ws *websocket.Conn, payload interface{}) {
websocket.JSON.Send(ws, runtimeInfo{name,
"0.4",
[]string{"protocol:runtime",
"protocol:graph",
"protocol:component",
"protocol:network",
"component:getsource"},
r.id,
websocket.JSON.Send(ws, runtimeMessage{
Protocol: "runtime",
Command: "runtime",
Payload: runtimeInfo{Type: name,
Version: "0.4",
Capabilities: []string{"protocol:runtime",
"protocol:graph",
"protocol:component",
"protocol:network",
"component:getsource"},
Id: r.id,
},
})
}
r.handlers["graph.clear"] = func(ws *websocket.Conn, payload interface{}) {
Expand All @@ -70,7 +74,11 @@ func (r *Runtime) Init(name string) {
})
UpdateComponentInfo(msg.Id)
entry, _ := ComponentRegistry[msg.Id]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["graph.addnode"] = func(ws *websocket.Conn, payload interface{}) {
msg := payload.(addNode)
Expand Down Expand Up @@ -111,52 +119,80 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].MapInPort(msg.Public, msg.Node, msg.Port)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["graph.removeinport"] = func(ws *websocket.Conn, payload interface{}) {
msg := payload.(removePort)
r.graphs[msg.Graph].UnsetInPort(msg.Public)
r.graphs[msg.Graph].UnmapInPort(msg.Public)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["graph.renameinport"] = func(ws *websocket.Conn, payload interface{}) {
msg := payload.(renamePort)
r.graphs[msg.Graph].RenameInPort(msg.From, msg.To)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["graph.addoutport"] = func(ws *websocket.Conn, payload interface{}) {
msg := payload.(addPort)
r.graphs[msg.Graph].MapOutPort(msg.Public, msg.Node, msg.Port)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["graph.removeoutport"] = func(ws *websocket.Conn, payload interface{}) {
msg := payload.(removePort)
r.graphs[msg.Graph].UnsetOutPort(msg.Public)
r.graphs[msg.Graph].UnmapOutPort(msg.Public)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["graph.renameoutport"] = func(ws *websocket.Conn, payload interface{}) {
msg := payload.(renamePort)
r.graphs[msg.Graph].RenameOutPort(msg.From, msg.To)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
r.handlers["component.list"] = func(ws *websocket.Conn, payload interface{}) {
for key, entry := range ComponentRegistry {
if len(entry.Info.InPorts) == 0 && len(entry.Info.OutPorts) == 0 {
// Need to obtain ports annotation for the first time
UpdateComponentInfo(key)
}
websocket.JSON.Send(ws, entry)
websocket.JSON.Send(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
})
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ func TestRuntimeGetRuntime(t *testing.T) {
if err = websocket.JSON.Send(ws, &Message{"runtime", "getruntime", nil}); err != nil {
t.Error(err.Error())
}
var res runtimeInfo
if err = websocket.JSON.Receive(ws, &res); err != nil {
var msg runtimeMessage
if err = websocket.JSON.Receive(ws, &msg); err != nil {
t.Error(err.Error())
return
}
if msg.Protocol != "runtime" || msg.Command != "runtime" {
t.Errorf("Invalid protocol (%s) or command (%s)", msg.Protocol, msg.Command)
return
}
res := msg.Payload
if res.Type != "goflow" {
t.Errorf("Invalid protocol type: %s\n", res.Type)
}
Expand Down

0 comments on commit 1f3565f

Please sign in to comment.