Skip to content

Commit

Permalink
Switch websocket library to gorilla/websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
trustmaster committed Mar 20, 2017
1 parent e1e7fff commit a8a2244
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 40 deletions.
92 changes: 56 additions & 36 deletions runtime.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package flow

import (
"code.google.com/p/go.net/websocket"
"github.com/gorilla/websocket"
"github.com/nu7hatch/gouuid"
"log"
"net"
"net/http"
"encoding/json"
)

type protocolHandler func(*websocket.Conn, interface{})
Expand All @@ -26,6 +26,20 @@ type Runtime struct {
ready chan struct{}
// Websocket server onShutdown signal
done chan struct{}
// Gorilla Webscocket upgrader
upgrader websocket.Upgrader
}

func sendJSON(ws *websocket.Conn, msg interface{}) {
bytes, err := json.Marshal(msg)
if err != nil {
log.Println("JSON encoding error", err)
return
}
err = ws.WriteMessage(websocket.TextMessage, bytes)
if err != nil {
log.Println("Websocket write error", err)
}
}

// Register command handlers
Expand All @@ -39,7 +53,7 @@ func (r *Runtime) Init(name string) {
r.ready = make(chan struct{})
r.handlers = make(map[string]protocolHandler)
r.handlers["runtime.getruntime"] = func(ws *websocket.Conn, payload interface{}) {
websocket.JSON.Send(ws, runtimeMessage{
sendJSON(ws, runtimeMessage{
Protocol: "runtime",
Command: "runtime",
Payload: runtimeInfo{Type: name,
Expand Down Expand Up @@ -74,7 +88,7 @@ func (r *Runtime) Init(name string) {
})
UpdateComponentInfo(msg.Id)
entry, _ := ComponentRegistry[msg.Id]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand Down Expand Up @@ -119,7 +133,7 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].MapInPort(msg.Public, msg.Node, msg.Port)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -131,7 +145,7 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].UnmapInPort(msg.Public)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -142,7 +156,7 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].RenameInPort(msg.From, msg.To)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -153,7 +167,7 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].MapOutPort(msg.Public, msg.Node, msg.Port)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -165,7 +179,7 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].UnmapOutPort(msg.Public)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -176,7 +190,7 @@ func (r *Runtime) Init(name string) {
r.graphs[msg.Graph].RenameOutPort(msg.From, msg.To)
UpdateComponentInfo(msg.Graph)
entry, _ := ComponentRegistry[msg.Graph]
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -188,7 +202,7 @@ func (r *Runtime) Init(name string) {
// Need to obtain ports annotation for the first time
UpdateComponentInfo(key)
}
websocket.JSON.Send(ws, componentMessage{
sendJSON(ws, componentMessage{
Protocol: "component",
Command: "component",
Payload: entry.Info,
Expand All @@ -212,42 +226,48 @@ func (r *Runtime) Stop() {
close(r.done)
}

func (r *Runtime) Handle(ws *websocket.Conn) {
defer func() {
err := ws.Close()
if err != nil {
log.Println(err.Error())
}
}()
var msg Message
if err := websocket.JSON.Receive(ws, &msg); err != nil {
log.Println(err.Error())
func (r *Runtime) Handle(w http.ResponseWriter, req *http.Request) {
ws, err := r.upgrader.Upgrade(w, req, nil)
if err != nil {
log.Println("Websocket upgrader failed", err)
return
}
handler, exists := r.handlers[msg.Protocol+"."+msg.Command]
if !exists {
log.Printf("Unknown command: %s.%s\n", msg.Protocol, msg.Command)
return
defer ws.Close()
for {
msgType, bytes, err := ws.ReadMessage()
if err != nil {
log.Println("Websocket read error:", err)
break
}
if msgType != websocket.TextMessage {
log.Println("Unexpected binary message")
break
}
var msg Message
err = json.Unmarshal(bytes, &msg)
if err != nil {
log.Println("JSON decoding error:", err)
break
}
handler, exists := r.handlers[msg.Protocol+"."+msg.Command]
if !exists {
log.Printf("Unknown command: %s.%s\n", msg.Protocol, msg.Command)
break
}
handler(ws, msg.Payload)
}
handler(ws, msg.Payload)
}

func (r *Runtime) Listen(address string) {
http.Handle("/", websocket.Handler(r.Handle))
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatalln(err.Error())
}
r.upgrader = websocket.Upgrader{}

http.Handle("/", http.HandlerFunc(r.Handle))

go func() {
err = http.Serve(listener, nil)
if err != nil {
log.Fatalln(err.Error())
}
log.Fatal(http.ListenAndServe(address, nil))
}()
close(r.ready)

// Wait for termination signal
<-r.done
listener.Close()
}
24 changes: 20 additions & 4 deletions runtime_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package flow

import (
"code.google.com/p/go.net/websocket"
"github.com/gorilla/websocket"
"encoding/json"
"testing"
)

Expand All @@ -20,20 +21,35 @@ func ensureRuntimeStarted() {
}
}

func sendJSONE(ws *websocket.Conn, msg interface{}) error {
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
err = ws.WriteMessage(websocket.TextMessage, bytes)
return err
}

// Tests runtime information support
func TestRuntimeGetRuntime(t *testing.T) {
ensureRuntimeStarted()
// Create a WebSocket client
ws, err := websocket.Dial("ws://localhost:13014/", "", "http://localhost/")
ws, _, err := websocket.DefaultDialer.Dial("ws://localhost:13014/", nil)
defer ws.Close()
if err != nil {
t.Error(err.Error())
}
// Send a runtime request and check the response
if err = websocket.JSON.Send(ws, &Message{"runtime", "getruntime", nil}); err != nil {
if err = sendJSONE(ws, &Message{"runtime", "getruntime", nil}); err != nil {
t.Error(err.Error())
}
var msg runtimeMessage
if err = websocket.JSON.Receive(ws, &msg); err != nil {
var bytes []byte
if _, bytes, err = ws.ReadMessage(); err != nil {
t.Error(err.Error())
return
}
if err = json.Unmarshal(bytes, &msg); err != nil {
t.Error(err.Error())
return
}
Expand Down

0 comments on commit a8a2244

Please sign in to comment.