diff --git a/README.md b/README.md index 2874709..0c2b53f 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,3 @@ # Go DDP DDP server and client implemented with go. - -## Server Example - -```go -package main - -import ( - "github.com/meteorhacks/goddp" -) - -func main() { - server := goddp.NewServer() - server.Method("hello", methodHandler) - server.Listen(":1337") -} - -func methodHandler(p []interface{}) (interface{}, error) { - return "result", nil -} -``` diff --git a/goddp.go b/goddp.go deleted file mode 100644 index a704459..0000000 --- a/goddp.go +++ /dev/null @@ -1,14 +0,0 @@ -package goddp - -import ( - "github.com/meteorhacks/goddp/client" - "github.com/meteorhacks/goddp/server" -) - -func NewClient() client.Client { - return client.New() -} - -func NewServer() server.Server { - return server.New() -} diff --git a/server/README.md b/server/README.md index 6d1f04c..b82b7b8 100644 --- a/server/README.md +++ b/server/README.md @@ -12,12 +12,20 @@ import ( ) func main() { - server := server.New() - server.Method("hello", methodHandler) - server.Listen(":1337") + s := server.New() + s.Method("double", handler) + s.Listen(":1337") } -func methodHandler(p []interface{}) (interface{}, error) { - return "result", nil +func handler(ctx server.MethodContext) { + n, ok := ctx.Args[0].(float64) + + if !ok { + ctx.SendError("invalid parameters") + } else { + ctx.SendResult(n * 2) + } + + ctx.SendUpdated() } ``` diff --git a/server/integration/integration_test.go b/server/integration/integration_test.go new file mode 100644 index 0000000..f0a7f9c --- /dev/null +++ b/server/integration/integration_test.go @@ -0,0 +1,217 @@ +package integration + +import ( + "encoding/json" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/meteorhacks/goddp/server" +) + +var ( + URL = "http://localhost:1337/websocket" + ORIGIN = "http://localhost:1337" + ADDR = "localhost:1337" + s server.Server +) + +type MethodError struct { + Error string `json:"error"` +} + +type Message struct { + Msg string `json:"msg"` + Session string `json:"session"` + ID string `json:"id"` + Result float64 `json:"result"` + Error MethodError `json:"error"` +} + +func TestStartServer(t *testing.T) { + s = server.New() + + s.Method("double", func(ctx server.MethodContext) { + n, ok := ctx.Args[0].(float64) + + if !ok { + ctx.SendError("invalid parameters") + } else { + ctx.SendResult(n * 2) + } + + ctx.SendUpdated() + }) + + go s.Listen(":1337") + time.Sleep(100 * time.Millisecond) +} + +func TestConnect(t *testing.T) { + ws, err := newClient() + if err != nil { + t.Error("websocket connection failed") + } + + defer ws.Close() + + writeMessage(ws, `{"msg": "connect", "version": "1", "support": ["1"]}`, t) + msg := readMessage(ws, t) + + if msg.Msg != "connected" { + t.Error("inconnect DDP message type") + } + + if len(msg.Session) != 17 { + t.Error("session field should be have 17 characters") + } +} + +func TestPingWithoutId(t *testing.T) { + ws, err := newClient() + if err != nil { + t.Error("websocket connection failed") + } + + defer ws.Close() + + writeMessage(ws, `{"msg": "connect", "version": "1", "support": ["1"]}`, t) + _ = readMessage(ws, t) // ignore "connected" message + + writeMessage(ws, `{"msg": "ping"}`, t) + msg := readMessage(ws, t) + + if msg.Msg != "pong" { + t.Error("inconnect DDP message type") + } +} + +func TestPingWithId(t *testing.T) { + ws, err := newClient() + if err != nil { + t.Error("websocket connection failed") + } + + defer ws.Close() + + writeMessage(ws, `{"msg": "connect", "version": "1", "support": ["1"]}`, t) + _ = readMessage(ws, t) // ignore "connected" message + + writeMessage(ws, `{"msg": "ping", "id": "test-id"}`, t) + msg := readMessage(ws, t) + + if msg.Msg != "pong" { + t.Error("inconnect DDP message type") + } + + if msg.ID != "test-id" { + t.Error("inconnect random id") + } +} + +func TestMethodResult(t *testing.T) { + ws, err := newClient() + if err != nil { + t.Error("websocket connection failed") + } + + defer ws.Close() + + writeMessage(ws, `{"msg": "connect", "version": "1", "support": ["1"]}`, t) + _ = readMessage(ws, t) // ignore "connected" message + + writeMessage(ws, `{"msg": "method", "id": "test-id", "method": "double", "params": [2]}`, t) + msg := readMessage(ws, t) + + if msg.Msg != "result" { + t.Error("inconnect DDP message type") + } + + if msg.ID != "test-id" { + t.Error("inconnect random id") + } + + if msg.Result != 4 { + t.Error("inconnect method result") + } +} + +func TestMethodError(t *testing.T) { + ws, err := newClient() + if err != nil { + t.Error("websocket connection failed") + } + + defer ws.Close() + + writeMessage(ws, `{"msg": "connect", "version": "1", "support": ["1"]}`, t) + _ = readMessage(ws, t) // ignore "connected" message + + writeMessage(ws, `{"msg": "method", "id": "test-id", "method": "double", "params": ["two"]}`, t) + msg := readMessage(ws, t) + + if msg.Msg != "result" { + t.Error("inconnect DDP message type") + } + + if msg.ID != "test-id" { + t.Error("inconnect random id") + } + + if msg.Error.Error == "" { + t.Error("method error should be set") + } +} + +func newClient() (*websocket.Conn, error) { + u, _ := url.Parse(URL) + conn, err := net.Dial("tcp", ADDR) + + if err != nil { + return nil, err + } + + header := http.Header{"Origin": {ORIGIN}} + ws, _, err := websocket.NewClient(conn, u, header, 1024, 1024) + return ws, err +} + +func writeMessage(c *websocket.Conn, str string, t *testing.T) { + w, err := c.NextWriter(websocket.TextMessage) + + if err != nil { + t.Error("cannot create websocket write") + } + + io.WriteString(w, str) + w.Close() +} + +func readMessage(c *websocket.Conn, t *testing.T) Message { + op, r, err := c.NextReader() + + if op != websocket.TextMessage { + t.Error("expecting a text message") + } + + if err != nil { + t.Error("cannot create reader") + } + + str, err := ioutil.ReadAll(r) + if err != nil { + t.Error("websocket read error") + } + + msg := Message{} + if err := json.Unmarshal(str, &msg); err != nil { + t.Error("cannot parse websocket response") + } + + return msg +} diff --git a/server/method_context.go b/server/method_context.go new file mode 100644 index 0000000..b300d60 --- /dev/null +++ b/server/method_context.go @@ -0,0 +1,64 @@ +package server + +import ( + "errors" +) + +type MethodContext struct { + ID string + Args []interface{} + Res Response + Done bool + Updated bool +} + +func NewMethodContext(m Message, res Response) MethodContext { + ctx := MethodContext{} + ctx.ID = m.ID + ctx.Args = m.Params + ctx.Res = res + return ctx +} + +func (ctx *MethodContext) SendResult(r interface{}) error { + if ctx.Done { + err := errors.New("already sent results for method") + return err + } + + ctx.Done = true + return ctx.Res.WriteJSON(map[string]interface{}{ + "msg": "result", + "id": ctx.ID, + "result": r, + }) +} + +func (ctx *MethodContext) SendError(e string) error { + if ctx.Done { + err := errors.New("already sent results for method") + return err + } + + ctx.Done = true + return ctx.Res.WriteJSON(map[string]interface{}{ + "msg": "result", + "id": ctx.ID, + "error": map[string]string{ + "error": e, + }, + }) +} + +func (ctx *MethodContext) SendUpdated() error { + if ctx.Updated { + err := errors.New("already sent updated for method") + return err + } + + ctx.Updated = true + return ctx.Res.WriteJSON(map[string]interface{}{ + "msg": "updated", + "methods": []string{ctx.ID}, + }) +} diff --git a/server/method_context_test.go b/server/method_context_test.go new file mode 100644 index 0000000..d401772 --- /dev/null +++ b/server/method_context_test.go @@ -0,0 +1,121 @@ +package server + +import ( + "reflect" + "testing" +) + +func TestSendResult(t *testing.T) { + r := &TestResponse{} + ctx := MethodContext{ID: "test-id", Res: r} + err := ctx.SendResult(100) + + expected := map[string]interface{}{ + "msg": "result", + "id": "test-id", + "result": 100, + } + + if err != nil { + t.Error("result should be sent successfully") + } + + if !ctx.Done { + t.Error("context must set that a result is sent") + } + + if !reflect.DeepEqual(r._data, expected) { + t.Error("invalid response for method result") + } +} + +func TestSendResultWhenDone(t *testing.T) { + r := &TestResponse{} + ctx := MethodContext{ID: "test-id", Res: r, Done: true} + err := ctx.SendResult(100) + + if err == nil { + t.Error("result should be sent only once") + } + + if r._data != nil { + t.Error("result should be sent only once") + } +} + +func TestSendError(t *testing.T) { + r := &TestResponse{} + ctx := MethodContext{ID: "test-id", Res: r} + err := ctx.SendError("test-error") + + expected := map[string]interface{}{ + "msg": "result", + "id": "test-id", + "error": map[string]string{ + "error": "test-error", + }, + } + + if err != nil { + t.Error("error should be sent successfully") + } + + if !ctx.Done { + t.Error("context must set that a result is sent") + } + + if !reflect.DeepEqual(r._data, expected) { + t.Error("invalid response for method error") + } +} + +func TestSendErrorWhenDone(t *testing.T) { + r := &TestResponse{} + ctx := MethodContext{ID: "test-id", Res: r, Done: true} + err := ctx.SendError("test-error") + + if err == nil { + t.Error("error should be sent only once") + } + + if r._data != nil { + t.Error("error should be sent only once") + } +} + +func TestSendUpdated(t *testing.T) { + r := &TestResponse{} + ctx := MethodContext{ID: "test-id", Res: r} + err := ctx.SendUpdated() + + expected := map[string]interface{}{ + "msg": "updated", + "methods": []string{"test-id"}, + } + + if err != nil { + t.Error("updated should be sent successfully") + } + + if !ctx.Updated { + t.Error("context must set that updated is sent") + } + + if !reflect.DeepEqual(r._data, expected) { + t.Error("invalid response for method updated") + } +} + +func TestSendUpdatedWhenDone(t *testing.T) { + r := &TestResponse{} + ctx := MethodContext{ID: "test-id", Res: r, Updated: true} + err := ctx.SendUpdated() + + if err == nil { + t.Error("updated message should be sent only once") + } + + if r._data != nil { + t.Error("updated message should be sent only once") + } +} diff --git a/server/server.go b/server/server.go index fca97c6..2d6d007 100644 --- a/server/server.go +++ b/server/server.go @@ -10,36 +10,35 @@ import ( "github.com/meteorhacks/goddp/utils/random" ) -type Server struct { - methods map[string]MethodHandler - upgrader websocket.Upgrader -} - func New() Server { - server := Server{} - server.methods = make(map[string]MethodHandler) - server.upgrader = websocket.Upgrader{ + s := Server{} + s.methods = make(map[string]MethodFn) + s.upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } - return server + return s } -func (s *Server) Method(n string, h MethodHandler) { - s.methods[n] = h +type Server struct { + methods map[string]MethodFn + upgrader websocket.Upgrader } -func (s *Server) Listen(ipPort string) { +func (s *Server) Listen(addr string) { http.HandleFunc("/websocket", s.Handler) - http.ListenAndServe(ipPort, nil) + http.ListenAndServe(addr, nil) +} + +func (s *Server) Method(name string, fn MethodFn) { + s.methods[name] = fn } func (s *Server) Handler(w http.ResponseWriter, r *http.Request) { ws, err := s.upgrader.Upgrade(w, r, nil) - if err != nil { - fmt.Println("Error: could not creating websocket connection") + // TODO => handle non-websocket requests return } @@ -47,94 +46,74 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) { msg, err := readMessage(ws) if err != nil { - ws.Close() break } - switch { - case msg.Msg == "ping": - go s.handlePing(ws, msg) - case msg.Msg == "connect": - go s.handleConnect(ws, msg) - case msg.Msg == "method": - go s.handleMethod(ws, msg) + switch msg.Msg { + case "connect": + handleConnect(s, ws, msg) + case "ping": + handlePing(s, ws, msg) + case "method": + handleMethod(s, ws, msg) default: - fmt.Println("Error: unknown ddp message", *msg) - ws.Close() + // TODO => send "error" ddp message break } } + + ws.Close() } -func (s *Server) handleConnect(c *websocket.Conn, m *Message) { - err := c.WriteJSON(map[string]string{ - "msg": "connected", - "session": random.Id(17), - }) +func readMessage(req Request) (Message, error) { + t, str, err := req.ReadMessage() + msg := Message{} if err != nil { - fmt.Println(err) + return msg, err } -} -func (s *Server) handlePing(c *websocket.Conn, m *Message) { - if m.Id != "" { - err := c.WriteJSON(map[string]string{ - "msg": "pong", - "id": m.Id, - }) - - if err != nil { - fmt.Println(err) - } - } else { - err := c.WriteJSON(map[string]string{ - "msg": "pong", - }) + if t != 1 { + err = errors.New("DDP does not supports binary streams yet") + return msg, err + } - if err != nil { - fmt.Println(err) - } + if err := json.Unmarshal(str, &msg); err != nil { + return msg, err } + + return msg, nil } -func (s *Server) handleMethod(c *websocket.Conn, m *Message) { - res, _ := s.methods[m.Method](m.Params) - err := c.WriteJSON(map[string]interface{}{ - "msg": "result", - "id": m.Id, - "result": res, +func handleConnect(s *Server, res Response, m Message) error { + return res.WriteJSON(map[string]string{ + "msg": "connected", + "session": random.Id(17), }) +} - if err != nil { - fmt.Println(err) +func handleMethod(s *Server, res Response, m Message) error { + fn, ok := s.methods[m.Method] + + if !ok { + err := errors.New(fmt.Sprintf("method %s not found", m.Method)) + return err } - err = c.WriteJSON(map[string]interface{}{ - "msg": "updated", - "methods": []string{m.Id}, - }) + ctx := NewMethodContext(m, res) + go fn(ctx) - if err != nil { - fmt.Println(err) - } + return nil } -func readMessage(ws *websocket.Conn) (*Message, error) { - t, str, err := ws.ReadMessage() - msg := &Message{} - - if err != nil { - // error reading message - return nil, err +func handlePing(s *Server, res Response, m Message) error { + msg := map[string]string{ + "msg": "pong", } - if t != 1 { - // ignore binary data - err = errors.New("Error: DDP does not supports binary streams yet.") - return nil, err + if m.ID != "" { + msg["id"] = m.ID } - err = json.Unmarshal(str, msg) - return msg, nil + return res.WriteJSON(msg) } diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 0000000..db6e8b1 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,135 @@ +package server + +import ( + "errors" + "reflect" + "testing" +) + +func TestAddMethod(t *testing.T) { + s := New() + s.Method("testfn", func(MethodContext) {}) + if _, ok := s.methods["testfn"]; !ok { + t.Error("method functionm ust be stored under methods") + } +} + +func TestReadMessageReadError(t *testing.T) { + req := TestRequest{Error: errors.New("test-error")} + if _, err := readMessage(&req); err == nil { + t.Error("an error must be returned if reading from Request fails") + } +} + +func TestReadMessageBinaryMessage(t *testing.T) { + req := TestRequest{Type: 2} + if _, err := readMessage(&req); err == nil { + t.Error("an error must be returned if type is binary") + } +} + +func TestReadMessageInvalidMessage(t *testing.T) { + str := []byte("invalid-json") + req := TestRequest{Type: 1, Message: str} + if _, err := readMessage(&req); err == nil { + t.Error("an error must be returned if message is not json") + } +} + +func TestReadMessageValidMessage(t *testing.T) { + str := []byte(`{"msg": "ping"}`) + req := TestRequest{Type: 1, Message: str} + msg, err := readMessage(&req) + + if err != nil { + t.Error("message must be read successfully") + } + + if msg.Msg != "ping" { + t.Error("message must have correct message type") + } +} + +func TestHandleConnect(t *testing.T) { + s := &Server{} + m := Message{} + r := &TestResponse{} + + if err := handleConnect(s, r, m); err != nil { + t.Error("connect should be handled successfully") + } + + data := r._data.(map[string]string) + if data["msg"] != "connected" { + t.Error("msg field should be 'connected'") + } + + if len(data["session"]) != 17 { + t.Error("session field should be have 17 characters") + } +} + +func TestUnavailableMethod(t *testing.T) { + s := &Server{} + m := Message{Method: "test"} + r := &TestResponse{} + + if err := handleMethod(s, r, m); err == nil { + t.Error("an error must be returned if method is not available") + } +} + +func TestAvailableMethod(t *testing.T) { + s := &Server{methods: make(map[string]MethodFn)} + m := Message{Method: "test"} + r := &TestResponse{} + c := make(chan bool) + + s.methods["test"] = func(ctx MethodContext) { + c <- true + } + + if err := handleMethod(s, r, m); err != nil { + t.Error("an error must not be returned if method is available") + } + + // block untill method is called + <-c +} + +func TestHandlePingWithoutID(t *testing.T) { + s := &Server{} + m := Message{} + r := &TestResponse{} + + if err := handlePing(s, r, m); err != nil { + t.Error("ping should be handled successfully") + } + + expected := map[string]string{ + "msg": "pong", + } + + if !reflect.DeepEqual(r._data, expected) { + t.Error("message should only have msg field") + } +} + +func TestHandlePingWithID(t *testing.T) { + s := &Server{} + m := Message{ID: "test-id"} + r := &TestResponse{} + + if err := handlePing(s, r, m); err != nil { + t.Error("ping should be handled successfully") + } + + expected := map[string]string{ + "msg": "pong", + "id": "test-id", + } + + if !reflect.DeepEqual(r._data, expected) { + t.Error("message should have msg and ID fields") + } +} diff --git a/server/types.go b/server/types.go index cc01628..a0c9913 100644 --- a/server/types.go +++ b/server/types.go @@ -1,6 +1,15 @@ package server -type MethodHandler func([]interface{}) (interface{}, error) +type Request interface { + ReadMessage() (int, []byte, error) +} + +type Response interface { + WriteJSON(interface{}) error +} + +type MethodFn func(MethodContext) +type Handler func(*Server, Response, Message) error // This has the all the possible fields a DDP message can have type Message struct { @@ -8,7 +17,7 @@ type Message struct { Session string `json:"session"` Version string `json:"version"` Support []string `json:"support"` - Id string `json:"id"` + ID string `json:"id"` Method string `json:"method"` Params []interface{} `json:"params"` Result string `json:"result"` diff --git a/server/types_test.go b/server/types_test.go new file mode 100644 index 0000000..ae42b3b --- /dev/null +++ b/server/types_test.go @@ -0,0 +1,20 @@ +package server + +type TestResponse struct { + _data interface{} +} + +func (t *TestResponse) WriteJSON(d interface{}) error { + t._data = d + return nil +} + +type TestRequest struct { + Type int + Message []byte + Error error +} + +func (t *TestRequest) ReadMessage() (int, []byte, error) { + return t.Type, t.Message, t.Error +}