From 8666d3b6b295f0d2d554dc9dcc221db41f1eb6fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 5 Jun 2020 09:58:06 +0200 Subject: [PATCH 1/4] GH-161: Added count field to Unsubscribe request in RES Client Protocol. --- docs/CHANGELOG.md | 1 + docs/res-client-protocol.md | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9a8101e..104202d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -5,6 +5,7 @@ All changes to the RES Protocol will be documented in this file. ## Unreleased * #157 Soft resource references. +* #161 Unsubscribe count field. ## v1.2.0 - [Resgate v1.4.0](compare/v1.3.0...v1.4.0) - 2019-11-20 diff --git a/docs/res-client-protocol.md b/docs/res-client-protocol.md index 7c10e98..41041fe 100644 --- a/docs/res-client-protocol.md +++ b/docs/res-client-protocol.md @@ -40,14 +40,14 @@ The RES-Client protocol is used in communication between the client and the gate A core concept in the RES-Client protocol is the subscriptions. A client may subscribe to resources by making [subscribe requests](#subscribe-request) with the unique [resource ID](res-protocol.md#resource-ids), or by getting a resource response on a [call request](#call-request) or [auth request](#auth-request). -A resource may be subscribed to [directly](#direct-subscription) or [indirectly](#indirect-subscription). Any reference to *subscription*, or a resource being *subscribed* to, in this document should be interpreted as both *direct* and *indirect* subscriptions, unless specified. +A resource may be subscribed to [directly](#direct-subscription) or [indirectly](#indirect-subscription). Any reference in this document to *subscription* or a resource being *subscribed* to, should be interpreted as both *direct* and *indirect* subscriptions, unless specified. The client will receive [events](#events) on anything that happens on a subscribed resource. A subscription lasts as long as the resource has direct or indirect subscriptions, or when the connection to the gateway is closed. ## Direct subscription The resource that is subscribed to with a [subscribe request](#subscribe-request), or returned as a resource response to a [call request](#call-request) or [auth request](#auth-request) will be considered *directly subscribed*. -It is possible to make multiple direct subscriptions on a resource. It will be considered directly subscribed until an equal number of [unsubscribe requests](#unsubscribe-request) has been made. +It is possible to have multiple direct subscriptions on a resource. It will be considered directly subscribed until the same number of subscriptions are matched using one ore more [unsubscribe requests](#unsubscribe-request). ## Indirect subscription A resource that is referred to with a non-soft [resource reference](res-protocol.md#values) by a [directly subscribed](#direct-subscription) resource, or by an indirectly subscribed resource, will be considered *indirectly subscribed*. Cyclic references where none of the resources are directly subscribed will not be considered subscribed. @@ -235,22 +235,29 @@ Any [resource reference](res-protocol.md#values) that fails will not lead to an ## Unsubscribe request -Unsubscribe requests are sent by the client to unsubscribe to a previous made [direct subscription](#direct-subscription). +Unsubscribe requests are sent by the client to unsubscribe to previous [direct subscriptions](#direct-subscription). The resource will only be considered unsubscribed when there are no more [direct](#direct-subscription) or [indirect](#indirect-subscription) subscriptions. +If the **count** property is omitted in the request, the value of 1 is assumed. + **method** `unsubscribe.` ### Parameters -The request has no parameters. +The request parameters are optional. +It not omitted, the parameters object SHOULD have the following property: + +**count** +The number of direct subscriptions to unsubscribe to. +MUST be a number greater than 0. ### Result The result has no payload. ### Error -An error response with code `system.noSubscription` will be sent if the resource has no direct subscription. +An error response with code `system.noSubscription` will be sent if the resource has no direct subscription, or if *count* exceeds the number of direct subscriptions. If so, the number of direct subscriptions will be unaffected. ## Get request From 950f16615017ca2cc78815b7e3eaa53f89cb6186 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 5 Jun 2020 10:50:31 +0200 Subject: [PATCH 2/4] GH-161: Added handling of unsubscribe count field. --- server/rpc/rpc.go | 27 ++++++++++++++++++++++++--- server/wsConn.go | 10 +++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 496d7c7..b7eac67 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -15,7 +15,7 @@ type Requester interface { Reply(data []byte) GetResource(rid string, callback func(data *Resources, err error)) SubscribeResource(rid string, callback func(data *Resources, err error)) - UnsubscribeResource(rid string, callback func(ok bool)) + UnsubscribeResource(rid string, count int, callback func(ok bool)) CallResource(rid, action string, params interface{}, callback func(result interface{}, err error)) AuthResource(rid, action string, params interface{}, callback func(result interface{}, err error)) NewResource(rid string, params interface{}, callback func(result interface{}, err error)) @@ -99,6 +99,11 @@ type CallResourceResult struct { *Resources } +// UnsubscribeRequest represents the params of an unsubscribe request +type UnsubscribeRequest struct { + Count *int `json:"count"` +} + var ( errMissingID = errors.New("Request is missing id property") ) @@ -121,7 +126,7 @@ func HandleRequest(data []byte, req Requester) error { if idx < 0 { if r.Method == "version" { var vr VersionRequest - if data != nil && !bytes.Equal(r.Params, nullBytes) { + if len(r.Params) > 0 && !bytes.Equal(r.Params, nullBytes) { err := json.Unmarshal(r.Params, &vr) if err != nil { req.Reply(r.ErrorResponse(reserr.ErrInvalidParams)) @@ -181,7 +186,23 @@ func HandleRequest(data []byte, req Requester) error { } }) case "unsubscribe": - req.UnsubscribeResource(rid, func(ok bool) { + count := 1 + if len(r.Params) > 0 && !bytes.Equal(r.Params, nullBytes) { + var ur UnsubscribeRequest + err := json.Unmarshal(r.Params, &ur) + if err != nil { + req.Reply(r.ErrorResponse(reserr.ErrInvalidParams)) + return nil + } + if ur.Count != nil { + count = *ur.Count + if count <= 0 { + req.Reply(r.ErrorResponse(reserr.ErrInvalidParams)) + return nil + } + } + } + req.UnsubscribeResource(rid, count, func(ok bool) { if ok { req.Reply(r.SuccessResponse(nil)) } else { diff --git a/server/wsConn.go b/server/wsConn.go index e2307f5..7937d23 100644 --- a/server/wsConn.go +++ b/server/wsConn.go @@ -467,8 +467,8 @@ func (c *wsConn) handleResourceResult(refRID string, cb func(result interface{}, }) } -func (c *wsConn) UnsubscribeResource(rid string, cb func(ok bool)) { - cb(c.UnsubscribeByRID(rid)) +func (c *wsConn) UnsubscribeResource(rid string, count int, cb func(ok bool)) { + cb(c.UnsubscribeByRID(rid, count)) } func (c *wsConn) subscribe(rid string, direct bool) (*Subscription, error) { @@ -507,17 +507,17 @@ func (c *wsConn) Unsubscribe(sub *Subscription, direct bool, count int, tryDelet c.removeCount(sub, direct, count, tryDelete) } -func (c *wsConn) UnsubscribeByRID(rid string) bool { +func (c *wsConn) UnsubscribeByRID(rid string, count int) bool { if c.disposing { return false } sub, ok := c.subs[rid] - if !ok || sub.direct == 0 { + if !ok || sub.direct < count { return false } - c.removeCount(sub, true, 1, true) + c.removeCount(sub, true, count, true) return true } From 47a6ffc4bf04e494404aea87a31efbe54ff8d60a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 5 Jun 2020 11:16:22 +0200 Subject: [PATCH 3/4] GH-161: Added unsubscribe count field tests. --- test/03unsubscribe_test.go | 143 +++++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 45 deletions(-) diff --git a/test/03unsubscribe_test.go b/test/03unsubscribe_test.go index 2e12ab0..158c5d5 100644 --- a/test/03unsubscribe_test.go +++ b/test/03unsubscribe_test.go @@ -2,6 +2,7 @@ package test import ( "encoding/json" + "fmt" "testing" "github.com/resgateio/resgate/server/reserr" @@ -161,55 +162,107 @@ func TestUnsubscribeOnOverlappingLinkedCollection(t *testing.T) { } func TestUnsubscribe_FollowedByResourceResponse_IncludesResource(t *testing.T) { - runTest(t, func(s *Session) { - c := s.Connect() - model := resourceData("test.model") + for useCount := true; useCount; useCount = false { + runNamedTest(t, fmt.Sprintf("with useCount set to %+v", useCount), func(s *Session) { + c := s.Connect() + model := resourceData("test.model") + + // Send subscribe request + creq := c.Request("subscribe.test.model", nil) + // Handle model get and access request + mreqs := s.GetParallelRequests(t, 2) + mreqs.GetRequest(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`)) + req := mreqs.GetRequest(t, "access.test.model") + req.RespondSuccess(json.RawMessage(`{"get":true}`)) + + // Validate client response and validate + creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`)) + + // Send client request + creq = c.Request("call.test.getModel", nil) + req = s.GetRequest(t) + req.AssertSubject(t, "access.test") + req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`)) + // Get call request + req = s.GetRequest(t) + req.AssertSubject(t, "call.test.getModel") + req.RespondResource("test.model") + // Validate client response + cresp := creq.GetResponse(t) + cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model"}`)) + + // Call unsubscribe + if useCount { + c.Request("unsubscribe.test.model", json.RawMessage(`{"count":1}`)).GetResponse(t) + } else { + c.Request("unsubscribe.test.model", json.RawMessage(`{}`)).GetResponse(t) + c.Request("unsubscribe.test.model", nil).GetResponse(t) + } + + // Send client request + creq = c.Request("call.test.getModel", nil) + req = s.GetRequest(t) + req.AssertSubject(t, "access.test") + req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`)) + // Get call request + req = s.GetRequest(t) + req.AssertSubject(t, "call.test.getModel") + req.RespondResource("test.model") + // Access request + req = s.GetRequest(t) + req.AssertSubject(t, "access.test.model") + req.RespondSuccess(json.RawMessage(`{"get":true}`)) + // Validate client response + cresp = creq.GetResponse(t) + cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model","models":{"test.model":`+model+`}}`)) + }) + } +} - // Send subscribe request - creq := c.Request("subscribe.test.model", nil) - // Handle model get and access request - mreqs := s.GetParallelRequests(t, 2) - mreqs.GetRequest(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`)) - req := mreqs.GetRequest(t, "access.test.model") - req.RespondSuccess(json.RawMessage(`{"get":true}`)) - - // Validate client response and validate - creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`)) - - // Send client request - creq = c.Request("call.test.getModel", nil) - req = s.GetRequest(t) - req.AssertSubject(t, "access.test") - req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`)) - // Get call request - req = s.GetRequest(t) - req.AssertSubject(t, "call.test.getModel") - req.RespondResource("test.model") - // Validate client response - cresp := creq.GetResponse(t) - cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model"}`)) +func TestUnsubscribe_WithCount_UnsubscribesModel(t *testing.T) { + runTest(t, func(s *Session) { + event := json.RawMessage(`{"foo":"bar"}`) - // Call unsubscribe - c.Request("unsubscribe.test.model", nil).GetResponse(t) + c := s.Connect() + subscribeToTestModel(t, s, c) // Call unsubscribe - c.Request("unsubscribe.test.model", nil).GetResponse(t) + c.Request("unsubscribe.test.model", json.RawMessage(`{"count":1}`)).GetResponse(t) - // Send client request - creq = c.Request("call.test.getModel", nil) - req = s.GetRequest(t) - req.AssertSubject(t, "access.test") - req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`)) - // Get call request - req = s.GetRequest(t) - req.AssertSubject(t, "call.test.getModel") - req.RespondResource("test.model") - // Access request - req = s.GetRequest(t) - req.AssertSubject(t, "access.test.model") - req.RespondSuccess(json.RawMessage(`{"get":true}`)) - // Validate client response - cresp = creq.GetResponse(t) - cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model","models":{"test.model":`+model+`}}`)) + // Send event on model and validate no event was sent to client + s.ResourceEvent("test.model", "custom", event) + c.AssertNoEvent(t, "test.model") }) } + +func TestUnsubscribe_WithInvalidPayload_DoesNotUnsubscribesModel(t *testing.T) { + tbl := []struct { + Payload interface{} + ErrorCode string + }{ + {json.RawMessage(`[]`), "system.invalidParams"}, + {json.RawMessage(`{"count":"foo"}`), "system.invalidParams"}, + {json.RawMessage(`{"count":true}`), "system.invalidParams"}, + {json.RawMessage(`{"count":0}`), "system.invalidParams"}, + {json.RawMessage(`{"count":-1}`), "system.invalidParams"}, + {json.RawMessage(`{"count":2}`), "system.noSubscription"}, + } + + event := json.RawMessage(`{"foo":"bar"}`) + + for i, l := range tbl { + runNamedTest(t, fmt.Sprintf("#%d", i+1), func(s *Session) { + c := s.Connect() + subscribeToTestModel(t, s, c) + + // Call unsubscribe + c.Request("unsubscribe.test.model", l.Payload). + GetResponse(t). + AssertErrorCode(t, l.ErrorCode) + + // Send event on model and validate it is still subscribed + s.ResourceEvent("test.model", "custom", event) + c.GetEvent(t).AssertEventName(t, "test.model.custom") + }) + } +} From 2240fdbcf57abb06ba32f38da19fe48579a74740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Jir=C3=A9nius?= Date: Fri, 5 Jun 2020 11:21:51 +0200 Subject: [PATCH 4/4] GH-161: Fixed broken test. --- test/03unsubscribe_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/03unsubscribe_test.go b/test/03unsubscribe_test.go index 158c5d5..e8fba66 100644 --- a/test/03unsubscribe_test.go +++ b/test/03unsubscribe_test.go @@ -193,7 +193,7 @@ func TestUnsubscribe_FollowedByResourceResponse_IncludesResource(t *testing.T) { // Call unsubscribe if useCount { - c.Request("unsubscribe.test.model", json.RawMessage(`{"count":1}`)).GetResponse(t) + c.Request("unsubscribe.test.model", json.RawMessage(`{"count":2}`)).GetResponse(t) } else { c.Request("unsubscribe.test.model", json.RawMessage(`{}`)).GetResponse(t) c.Request("unsubscribe.test.model", nil).GetResponse(t)