Skip to content

Commit

Permalink
Merge f516efb into 0480426
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Nov 13, 2019
2 parents 0480426 + f516efb commit 0cfb983
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 49 deletions.
63 changes: 49 additions & 14 deletions docs/res-client-protocol.md
Expand Up @@ -15,6 +15,7 @@
- [Requests](#requests)
* [Request method](#request-method)
- [Request types](#request-types)
* [Version request](#version-request)
* [Subscribe request](#subscribe-request)
* [Unsubscribe request](#unsubscribe-request)
* [Get request](#get-request)
Expand Down Expand Up @@ -133,17 +134,18 @@ It can be used to hold values for replacing placeholders in the message.

There are a number of predefined errors.

Code | Message | Meaning
----------------------- | ------------------ | ----------------------------------------
`system.notFound` | Not found | The resource was not found
`system.invalidParams` | Invalid parameters | Invalid parameters in method call
`system.invalidQuery` | Invalid query | Invalid query or query parameters
`system.internalError` | Internal error | Internal error
`system.methodNotFound` | Method not found | Resource method not found
`system.accessDenied` | Access denied | Access to a resource or method is denied
`system.timeout` | Request timeout | Request timed out
`system.noSubscription` | No subscription | The resource has no direct subscription
`system.invalidRequest` | Invalid request | Invalid request
Code | Message | Meaning
--- | --- | ---
`system.notFound` | Not found | The resource was not found
`system.invalidParams` | Invalid parameters | Invalid parameters in method call
`system.invalidQuery` | Invalid query | Invalid query or query parameters
`system.internalError` | Internal error | Internal error
`system.methodNotFound` | Method not found | Resource method not found
`system.accessDenied` | Access denied | Access to a resource or method is denied
`system.timeout` | Request timeout | Request timed out
`system.noSubscription` | No subscription | The resource has no direct subscription
`system.invalidRequest` | Invalid request | Invalid request
`system.unsupportedProtocol` | Unsupported protocol | RES protocol version is not supported


# Requests
Expand All @@ -157,12 +159,15 @@ A request method has the following structure:

`<type>.<resourceID>.<resourceMethod>`

* type - the request type. May be either `subscribe`, `unsubscribe`, `get`, `call`, `auth`, or `new`.
* resourceID - the [resource ID](res-protocol.md#resource-ids).
* resourceMethod - the resource method. Only used for `call` or `auth` type requests. If not included, the separating dot (`.`) must also not be included.
* type - the request type. May be either `version`, `subscribe`, `unsubscribe`, `get`, `call`, `auth`, or `new`.
* resourceID - the [resource ID](res-protocol.md#resource-ids). Not used for `version` type requests.
* resourceMethod - the resource method. Only used for `call` or `auth` type requests.

Trailing separating dots (`.`) must not be included.

**Examples**

* `version` - Version request
* `subscribe.userService.users` - Subscribe request of a collection of users
* `call.userService.user.42.set` - Call request to set properties on a user
* `new.userService.users` - New request to create a new user
Expand All @@ -171,6 +176,36 @@ A request method has the following structure:

# Request types

## Version request

**method**
`version`

Version requests are sent by the client to tell which RES protocol version it supports, and to get information on what protocol version the gateway supports.

The request SHOULD be the first request sent by the client after an established connection.

If not sent, or if the **protocol** property is omitted in the request, the gateway SHOULD assume version v1.1.x.

### Parameters
The request parameters are optional.
It not omitted, the parameters object SHOULD have the following property:

**protocol**
The RES protocol version supported by the client.
MUST be a string in the format `"[MAJOR].[MINOR].[PATCH]"`. Eg. `"1.2.3"`.

### Result

**protocol**
The RES protocol version supported by the gateway.
MUST be a string in the format `"[MAJOR].[MINOR].[PATCH]"`. Eg. `"1.2.3"`.

### Error

A `system.unsupportedProtocol` error response will be sent if the gateway cannot support the client protocol version.
A `system.invalidRequest` error response will be sent if the gateway only supports RES Protocol v1.1.1 or below, prior to the introduction of the [version request](#version-request).

## Subscribe request

**method**
Expand Down
40 changes: 21 additions & 19 deletions server/reserr/reserr.go
Expand Up @@ -36,15 +36,16 @@ func IsError(err error, code string) bool {

// Pre-defined RES error codes
const (
CodeAccessDenied = "system.accessDenied"
CodeInternalError = "system.internalError"
CodeInvalidParams = "system.invalidParams"
CodeInvalidQuery = "system.invalidQuery"
CodeMethodNotFound = "system.methodNotFound"
CodeNoSubscription = "system.noSubscription"
CodeNotFound = "system.notFound"
CodeTimeout = "system.timeout"
CodeInvalidRequest = "system.invalidRequest"
CodeAccessDenied = "system.accessDenied"
CodeInternalError = "system.internalError"
CodeInvalidParams = "system.invalidParams"
CodeInvalidQuery = "system.invalidQuery"
CodeMethodNotFound = "system.methodNotFound"
CodeNoSubscription = "system.noSubscription"
CodeNotFound = "system.notFound"
CodeTimeout = "system.timeout"
CodeInvalidRequest = "system.invalidRequest"
CodeUnsupportedProtocol = "system.unsupportedProtocol"
// HTTP only error codes
CodeBadRequest = "system.badRequest"
CodeMethodNotAllowed = "system.methodNotAllowed"
Expand All @@ -55,16 +56,17 @@ const (
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#pre-defined-errors
// https://github.com/resgateio/resgate/blob/master/docs/res-client-protocol.md#pre-defined-errors
var (
ErrAccessDenied = &Error{Code: CodeAccessDenied, Message: "Access denied"}
ErrDisposing = &Error{Code: CodeInternalError, Message: "Internal error: disposing connection"}
ErrInternalError = &Error{Code: CodeInternalError, Message: "Internal error"}
ErrInvalidParams = &Error{Code: CodeInvalidParams, Message: "Invalid parameters"}
ErrInvalidQuery = &Error{Code: CodeInvalidQuery, Message: "Invalid query"}
ErrMethodNotFound = &Error{Code: CodeMethodNotFound, Message: "Method not found"}
ErrNoSubscription = &Error{Code: CodeNoSubscription, Message: "No subscription"}
ErrNotFound = &Error{Code: CodeNotFound, Message: "Not found"}
ErrTimeout = &Error{Code: CodeTimeout, Message: "Request timeout"}
ErrInvalidRequest = &Error{Code: CodeInvalidRequest, Message: "Invalid request"}
ErrAccessDenied = &Error{Code: CodeAccessDenied, Message: "Access denied"}
ErrDisposing = &Error{Code: CodeInternalError, Message: "Internal error: disposing connection"}
ErrInternalError = &Error{Code: CodeInternalError, Message: "Internal error"}
ErrInvalidParams = &Error{Code: CodeInvalidParams, Message: "Invalid parameters"}
ErrInvalidQuery = &Error{Code: CodeInvalidQuery, Message: "Invalid query"}
ErrMethodNotFound = &Error{Code: CodeMethodNotFound, Message: "Method not found"}
ErrNoSubscription = &Error{Code: CodeNoSubscription, Message: "No subscription"}
ErrNotFound = &Error{Code: CodeNotFound, Message: "Not found"}
ErrTimeout = &Error{Code: CodeTimeout, Message: "Request timeout"}
ErrInvalidRequest = &Error{Code: CodeInvalidRequest, Message: "Invalid request"}
ErrUnsupportedProtocol = &Error{Code: CodeUnsupportedProtocol, Message: "Unsupported protocol"}
// HTTP only errors
ErrBadRequest = &Error{Code: CodeBadRequest, Message: "Bad request"}
ErrMethodNotAllowed = &Error{Code: CodeMethodNotAllowed, Message: "Method not allowed"}
Expand Down
32 changes: 32 additions & 0 deletions server/rpc/rpc.go
@@ -1,6 +1,7 @@
package rpc

import (
"bytes"
"encoding/json"
"errors"
"strings"
Expand All @@ -18,6 +19,8 @@ type Requester interface {
CallResource(rid, action string, params interface{}, callback func(result json.RawMessage, err error))
AuthResource(rid, action string, params interface{}, callback func(result json.RawMessage, err error))
NewResource(rid string, params interface{}, callback func(data *NewResult, err error))
SetVersion(protocol string) (string, error)
ProtocolVersion() int
}

// Request represent a RES-client request
Expand Down Expand Up @@ -54,6 +57,16 @@ type Resources struct {
Errors map[string]*reserr.Error `json:"errors,omitempty"`
}

// VersionRequest represents the params of a version request
type VersionRequest struct {
Protocol string `json:"protocol"`
}

// VersionResult represents the results of a version request
type VersionResult struct {
Protocol string `json:"protocol"`
}

// AddEvent represents a RES-client collection add event
// https://github.com/resgateio/resgate/blob/master/docs/res-client-protocol.md#collection-add-event
type AddEvent struct {
Expand Down Expand Up @@ -85,6 +98,8 @@ var (
errMissingID = errors.New("Request is missing id property")
)

var nullBytes = []byte("null")

// HandleRequest unmarshals a request byte array and dispatches the request to the requester
func HandleRequest(data []byte, req Requester) error {
r := &Request{}
Expand All @@ -99,6 +114,23 @@ func HandleRequest(data []byte, req Requester) error {

idx := strings.IndexByte(r.Method, '.')
if idx < 0 {
if r.Method == "version" {
var vr VersionRequest
if data != nil && !bytes.Equal(r.Params, nullBytes) {
err := json.Unmarshal(r.Params, &vr)
if err != nil {
req.Reply(r.ErrorResponse(reserr.ErrInvalidParams))
return nil
}
}
p, err := req.SetVersion(vr.Protocol)
if err != nil {
req.Reply(r.ErrorResponse(err))
return nil
}
req.Reply(r.SuccessResponse(VersionResult{Protocol: p}))
return nil
}
req.Reply(r.ErrorResponse(reserr.ErrInvalidRequest))
return nil
}
Expand Down
71 changes: 55 additions & 16 deletions server/wsConn.go
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"sync"

Expand All @@ -17,22 +18,25 @@ import (
)

type wsConn struct {
cid string
ws *websocket.Conn
request *http.Request
token json.RawMessage
serv *Service
subs map[string]*Subscription
disposing bool
mqSub mq.Unsubscriber
connStr string
cid string
ws *websocket.Conn
request *http.Request
token json.RawMessage
serv *Service
subs map[string]*Subscription
disposing bool
mqSub mq.Unsubscriber
connStr string
protocolVer int

queue []func()
work chan struct{}

mu sync.Mutex
}

const legacyProtocol = 1001001 // MAJOR * 1000000 + MINOR * 1000 + PATCH

func (s *Service) newWSConn(ws *websocket.Conn, request *http.Request) *wsConn {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -45,13 +49,14 @@ func (s *Service) newWSConn(ws *websocket.Conn, request *http.Request) *wsConn {
cid := xid.New()

conn := &wsConn{
cid: cid.String(),
ws: ws,
request: request,
serv: s,
subs: make(map[string]*Subscription),
queue: make([]func(), 0, WSConnWorkerQueueSize),
work: make(chan struct{}, 1),
cid: cid.String(),
ws: ws,
request: request,
serv: s,
subs: make(map[string]*Subscription),
queue: make([]func(), 0, WSConnWorkerQueueSize),
work: make(chan struct{}, 1),
protocolVer: legacyProtocol,
}
conn.connStr = "[" + conn.cid + "]"

Expand Down Expand Up @@ -79,6 +84,10 @@ func (c *wsConn) HTTPRequest() *http.Request {
return c.request
}

func (c *wsConn) ProtocolVersion() int {
return c.protocolVer
}

func (c *wsConn) listen() {
var in []byte
var err error
Expand Down Expand Up @@ -239,6 +248,36 @@ func (c *wsConn) GetResource(rid string, cb func(data *rpc.Resources, err error)
})
}

func (c *wsConn) SetVersion(protocol string) (string, error) {
// Quick exit on empty protocol
if protocol == "" {
return ProtocolVersion, nil
}

parts := strings.Split(protocol, ".")
if len(parts) != 3 {
return "", reserr.ErrInvalidParams
}

v := 0
for i := 0; i < 3; i++ {
p, err := strconv.Atoi(parts[i])
if err != nil || p >= 1000 {
return "", reserr.ErrInvalidParams
}
v *= 1000
v += p
}

if v < 1000000 || v >= 2000000 {
return "", reserr.ErrUnsupportedProtocol
}

c.protocolVer = v

return ProtocolVersion, nil
}

func (c *wsConn) GetSubscription(rid string, cb func(sub *Subscription, err error)) {
sub, err := c.Subscribe(rid, true)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions test/20version_test.go
@@ -0,0 +1,64 @@
package test

import (
"encoding/json"
"fmt"
"testing"

"github.com/resgateio/resgate/server"
"github.com/resgateio/resgate/server/reserr"
)

func TestVersion_Request_ReturnsExpectedResponse(t *testing.T) {

versionResult := json.RawMessage(fmt.Sprintf(`{"protocol":"%s"}`, server.ProtocolVersion))

tbl := []struct {
Params json.RawMessage
Expected interface{}
}{
// Valid requests
{nil, versionResult},
{json.RawMessage(`{}`), versionResult},
{json.RawMessage(`{"foo":"bar"}`), versionResult},
{json.RawMessage(`{"protocol":""}`), versionResult},
{json.RawMessage(`{"protocol":null}`), versionResult},
{json.RawMessage(`{"protocol":"1.0.0"}`), versionResult},
{json.RawMessage(`{"protocol":"1.1.0"}`), versionResult},
{json.RawMessage(`{"protocol":"1.0.1"}`), versionResult},
{json.RawMessage(`{"protocol":"1.999.999"}`), versionResult},
// Invalid params
{json.RawMessage(`""`), reserr.ErrInvalidParams},
{json.RawMessage(`"1.0.0"`), reserr.ErrInvalidParams},
{json.RawMessage(`["1.0.0"]`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":1.0}`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":"1.0"}`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":"1.2.3.4"}`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":"v1.0.0"}`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":"1.0.1000"}`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":"1.1000.0"}`), reserr.ErrInvalidParams},
{json.RawMessage(`{"protocol":"v1.0.0"}`), reserr.ErrInvalidParams},
// Unsupported protocol
{json.RawMessage(`{"protocol":"0.0.0"}`), reserr.ErrUnsupportedProtocol},
{json.RawMessage(`{"protocol":"2.0.0"}`), reserr.ErrUnsupportedProtocol},
{json.RawMessage(`{"protocol":"0.999.999"}`), reserr.ErrUnsupportedProtocol},
{json.RawMessage(`{"protocol":"3.2.1"}`), reserr.ErrUnsupportedProtocol},
}

for i, l := range tbl {
runNamedTest(t, fmt.Sprintf("#%d", i+1), func(s *Session) {
c := s.Connect()

// Send client call request
creq := c.Request("version", l.Params)

// Validate client response
cresp := creq.GetResponse(t)
if err, ok := l.Expected.(*reserr.Error); ok {
cresp.AssertError(t, err)
} else {
cresp.AssertResult(t, l.Expected)
}
})
}
}

0 comments on commit 0cfb983

Please sign in to comment.