Skip to content

Commit

Permalink
Merge pull request #162 from resgateio/feature/gh-161-unsubscribe-cou…
Browse files Browse the repository at this point in the history
…nt-field

Feature/gh 161 unsubscribe count field
  • Loading branch information
jirenius committed Jun 5, 2020
2 parents f63e4cb + 2240fdb commit 0b333c8
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 58 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ All changes to the RES Protocol will be documented in this file.
## Unreleased

* #157 Soft resource references.
* #161 Unsubscribe count field.

## v1.2.0 - [Resgate v1.4.0](compare/v1.3.0...v1.4.0) - 2019-11-20

Expand Down
17 changes: 12 additions & 5 deletions docs/res-client-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ The RES-Client protocol is used in communication between the client and the gate

A core concept in the RES-Client protocol is the subscriptions. A client may subscribe to resources by making [subscribe requests](#subscribe-request) with the unique [resource ID](res-protocol.md#resource-ids), or by getting a resource response on a [call request](#call-request) or [auth request](#auth-request).

A resource may be subscribed to [directly](#direct-subscription) or [indirectly](#indirect-subscription). Any reference to *subscription*, or a resource being *subscribed* to, in this document should be interpreted as both *direct* and *indirect* subscriptions, unless specified.
A resource may be subscribed to [directly](#direct-subscription) or [indirectly](#indirect-subscription). Any reference in this document to *subscription* or a resource being *subscribed* to, should be interpreted as both *direct* and *indirect* subscriptions, unless specified.

The client will receive [events](#events) on anything that happens on a subscribed resource. A subscription lasts as long as the resource has direct or indirect subscriptions, or when the connection to the gateway is closed.

## Direct subscription
The resource that is subscribed to with a [subscribe request](#subscribe-request), or returned as a resource response to a [call request](#call-request) or [auth request](#auth-request) will be considered *directly subscribed*.

It is possible to make multiple direct subscriptions on a resource. It will be considered directly subscribed until an equal number of [unsubscribe requests](#unsubscribe-request) has been made.
It is possible to have multiple direct subscriptions on a resource. It will be considered directly subscribed until the same number of subscriptions are matched using one ore more [unsubscribe requests](#unsubscribe-request).

## Indirect subscription
A resource that is referred to with a non-soft [resource reference](res-protocol.md#values) by a [directly subscribed](#direct-subscription) resource, or by an indirectly subscribed resource, will be considered *indirectly subscribed*. Cyclic references where none of the resources are directly subscribed will not be considered subscribed.
Expand Down Expand Up @@ -235,22 +235,29 @@ Any [resource reference](res-protocol.md#values) that fails will not lead to an

## Unsubscribe request

Unsubscribe requests are sent by the client to unsubscribe to a previous made [direct subscription](#direct-subscription).
Unsubscribe requests are sent by the client to unsubscribe to previous [direct subscriptions](#direct-subscription).

The resource will only be considered unsubscribed when there are no more [direct](#direct-subscription) or [indirect](#indirect-subscription) subscriptions.

If the **count** property is omitted in the request, the value of 1 is assumed.

**method**
`unsubscribe.<resourceID>`

### Parameters
The request has no parameters.
The request parameters are optional.
It not omitted, the parameters object SHOULD have the following property:

**count**
The number of direct subscriptions to unsubscribe to.
MUST be a number greater than 0.

### Result
The result has no payload.

### Error

An error response with code `system.noSubscription` will be sent if the resource has no direct subscription.
An error response with code `system.noSubscription` will be sent if the resource has no direct subscription, or if *count* exceeds the number of direct subscriptions. If so, the number of direct subscriptions will be unaffected.


## Get request
Expand Down
27 changes: 24 additions & 3 deletions server/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Requester interface {
Reply(data []byte)
GetResource(rid string, callback func(data *Resources, err error))
SubscribeResource(rid string, callback func(data *Resources, err error))
UnsubscribeResource(rid string, callback func(ok bool))
UnsubscribeResource(rid string, count int, callback func(ok bool))
CallResource(rid, action string, params interface{}, callback func(result interface{}, err error))
AuthResource(rid, action string, params interface{}, callback func(result interface{}, err error))
NewResource(rid string, params interface{}, callback func(result interface{}, err error))
Expand Down Expand Up @@ -99,6 +99,11 @@ type CallResourceResult struct {
*Resources
}

// UnsubscribeRequest represents the params of an unsubscribe request
type UnsubscribeRequest struct {
Count *int `json:"count"`
}

var (
errMissingID = errors.New("Request is missing id property")
)
Expand All @@ -121,7 +126,7 @@ func HandleRequest(data []byte, req Requester) error {
if idx < 0 {
if r.Method == "version" {
var vr VersionRequest
if data != nil && !bytes.Equal(r.Params, nullBytes) {
if len(r.Params) > 0 && !bytes.Equal(r.Params, nullBytes) {
err := json.Unmarshal(r.Params, &vr)
if err != nil {
req.Reply(r.ErrorResponse(reserr.ErrInvalidParams))
Expand Down Expand Up @@ -181,7 +186,23 @@ func HandleRequest(data []byte, req Requester) error {
}
})
case "unsubscribe":
req.UnsubscribeResource(rid, func(ok bool) {
count := 1
if len(r.Params) > 0 && !bytes.Equal(r.Params, nullBytes) {
var ur UnsubscribeRequest
err := json.Unmarshal(r.Params, &ur)
if err != nil {
req.Reply(r.ErrorResponse(reserr.ErrInvalidParams))
return nil
}
if ur.Count != nil {
count = *ur.Count
if count <= 0 {
req.Reply(r.ErrorResponse(reserr.ErrInvalidParams))
return nil
}
}
}
req.UnsubscribeResource(rid, count, func(ok bool) {
if ok {
req.Reply(r.SuccessResponse(nil))
} else {
Expand Down
10 changes: 5 additions & 5 deletions server/wsConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ func (c *wsConn) handleResourceResult(refRID string, cb func(result interface{},
})
}

func (c *wsConn) UnsubscribeResource(rid string, cb func(ok bool)) {
cb(c.UnsubscribeByRID(rid))
func (c *wsConn) UnsubscribeResource(rid string, count int, cb func(ok bool)) {
cb(c.UnsubscribeByRID(rid, count))
}

func (c *wsConn) subscribe(rid string, direct bool) (*Subscription, error) {
Expand Down Expand Up @@ -507,17 +507,17 @@ func (c *wsConn) Unsubscribe(sub *Subscription, direct bool, count int, tryDelet
c.removeCount(sub, direct, count, tryDelete)
}

func (c *wsConn) UnsubscribeByRID(rid string) bool {
func (c *wsConn) UnsubscribeByRID(rid string, count int) bool {
if c.disposing {
return false
}

sub, ok := c.subs[rid]
if !ok || sub.direct == 0 {
if !ok || sub.direct < count {
return false
}

c.removeCount(sub, true, 1, true)
c.removeCount(sub, true, count, true)
return true
}

Expand Down
143 changes: 98 additions & 45 deletions test/03unsubscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"encoding/json"
"fmt"
"testing"

"github.com/resgateio/resgate/server/reserr"
Expand Down Expand Up @@ -161,55 +162,107 @@ func TestUnsubscribeOnOverlappingLinkedCollection(t *testing.T) {
}

func TestUnsubscribe_FollowedByResourceResponse_IncludesResource(t *testing.T) {
runTest(t, func(s *Session) {
c := s.Connect()
model := resourceData("test.model")
for useCount := true; useCount; useCount = false {
runNamedTest(t, fmt.Sprintf("with useCount set to %+v", useCount), func(s *Session) {
c := s.Connect()
model := resourceData("test.model")

// Send subscribe request
creq := c.Request("subscribe.test.model", nil)
// Handle model get and access request
mreqs := s.GetParallelRequests(t, 2)
mreqs.GetRequest(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))
req := mreqs.GetRequest(t, "access.test.model")
req.RespondSuccess(json.RawMessage(`{"get":true}`))

// Validate client response and validate
creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`))

// Send client request
creq = c.Request("call.test.getModel", nil)
req = s.GetRequest(t)
req.AssertSubject(t, "access.test")
req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`))
// Get call request
req = s.GetRequest(t)
req.AssertSubject(t, "call.test.getModel")
req.RespondResource("test.model")
// Validate client response
cresp := creq.GetResponse(t)
cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model"}`))

// Call unsubscribe
if useCount {
c.Request("unsubscribe.test.model", json.RawMessage(`{"count":2}`)).GetResponse(t)
} else {
c.Request("unsubscribe.test.model", json.RawMessage(`{}`)).GetResponse(t)
c.Request("unsubscribe.test.model", nil).GetResponse(t)
}

// Send client request
creq = c.Request("call.test.getModel", nil)
req = s.GetRequest(t)
req.AssertSubject(t, "access.test")
req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`))
// Get call request
req = s.GetRequest(t)
req.AssertSubject(t, "call.test.getModel")
req.RespondResource("test.model")
// Access request
req = s.GetRequest(t)
req.AssertSubject(t, "access.test.model")
req.RespondSuccess(json.RawMessage(`{"get":true}`))
// Validate client response
cresp = creq.GetResponse(t)
cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model","models":{"test.model":`+model+`}}`))
})
}
}

// Send subscribe request
creq := c.Request("subscribe.test.model", nil)
// Handle model get and access request
mreqs := s.GetParallelRequests(t, 2)
mreqs.GetRequest(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))
req := mreqs.GetRequest(t, "access.test.model")
req.RespondSuccess(json.RawMessage(`{"get":true}`))

// Validate client response and validate
creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`))

// Send client request
creq = c.Request("call.test.getModel", nil)
req = s.GetRequest(t)
req.AssertSubject(t, "access.test")
req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`))
// Get call request
req = s.GetRequest(t)
req.AssertSubject(t, "call.test.getModel")
req.RespondResource("test.model")
// Validate client response
cresp := creq.GetResponse(t)
cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model"}`))
func TestUnsubscribe_WithCount_UnsubscribesModel(t *testing.T) {
runTest(t, func(s *Session) {
event := json.RawMessage(`{"foo":"bar"}`)

// Call unsubscribe
c.Request("unsubscribe.test.model", nil).GetResponse(t)
c := s.Connect()
subscribeToTestModel(t, s, c)

// Call unsubscribe
c.Request("unsubscribe.test.model", nil).GetResponse(t)
c.Request("unsubscribe.test.model", json.RawMessage(`{"count":1}`)).GetResponse(t)

// Send client request
creq = c.Request("call.test.getModel", nil)
req = s.GetRequest(t)
req.AssertSubject(t, "access.test")
req.RespondSuccess(json.RawMessage(`{"get":true,"call":"*"}`))
// Get call request
req = s.GetRequest(t)
req.AssertSubject(t, "call.test.getModel")
req.RespondResource("test.model")
// Access request
req = s.GetRequest(t)
req.AssertSubject(t, "access.test.model")
req.RespondSuccess(json.RawMessage(`{"get":true}`))
// Validate client response
cresp = creq.GetResponse(t)
cresp.AssertResult(t, json.RawMessage(`{"rid":"test.model","models":{"test.model":`+model+`}}`))
// Send event on model and validate no event was sent to client
s.ResourceEvent("test.model", "custom", event)
c.AssertNoEvent(t, "test.model")
})
}

func TestUnsubscribe_WithInvalidPayload_DoesNotUnsubscribesModel(t *testing.T) {
tbl := []struct {
Payload interface{}
ErrorCode string
}{
{json.RawMessage(`[]`), "system.invalidParams"},
{json.RawMessage(`{"count":"foo"}`), "system.invalidParams"},
{json.RawMessage(`{"count":true}`), "system.invalidParams"},
{json.RawMessage(`{"count":0}`), "system.invalidParams"},
{json.RawMessage(`{"count":-1}`), "system.invalidParams"},
{json.RawMessage(`{"count":2}`), "system.noSubscription"},
}

event := json.RawMessage(`{"foo":"bar"}`)

for i, l := range tbl {
runNamedTest(t, fmt.Sprintf("#%d", i+1), func(s *Session) {
c := s.Connect()
subscribeToTestModel(t, s, c)

// Call unsubscribe
c.Request("unsubscribe.test.model", l.Payload).
GetResponse(t).
AssertErrorCode(t, l.ErrorCode)

// Send event on model and validate it is still subscribed
s.ResourceEvent("test.model", "custom", event)
c.GetEvent(t).AssertEventName(t, "test.model.custom")
})
}
}

0 comments on commit 0b333c8

Please sign in to comment.