Skip to content

Commit

Permalink
Merge branch 'develop' into feature/gh-222-nats-no-responders-support
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Jan 6, 2022
2 parents f5ff52e + c772248 commit e46be59
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 12 deletions.
2 changes: 2 additions & 0 deletions server/reserr/reserr.go
Expand Up @@ -47,6 +47,7 @@ const (
CodeInvalidRequest = "system.invalidRequest"
CodeUnsupportedProtocol = "system.unsupportedProtocol"
CodeSubjectTooLong = "system.subjectTooLong"
CodeDeleted = "system.deleted"
// HTTP only error codes
CodeBadRequest = "system.badRequest"
CodeMethodNotAllowed = "system.methodNotAllowed"
Expand All @@ -70,6 +71,7 @@ var (
ErrInvalidRequest = &Error{Code: CodeInvalidRequest, Message: "Invalid request"}
ErrUnsupportedProtocol = &Error{Code: CodeUnsupportedProtocol, Message: "Unsupported protocol"}
ErrSubjectTooLong = &Error{Code: CodeSubjectTooLong, Message: "Subject too long"}
ErrDeleted = &Error{Code: CodeDeleted, Message: "Deleted"}
// HTTP only errors
ErrBadRequest = &Error{Code: CodeBadRequest, Message: "Bad request"}
ErrMethodNotAllowed = &Error{Code: CodeMethodNotAllowed, Message: "Method not allowed"}
Expand Down
16 changes: 13 additions & 3 deletions server/subscription.go
Expand Up @@ -641,7 +641,8 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) {

case "delete":
s.state = stateDeleted
fallthrough
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
s.unsubscribeDirect(reserr.ErrDeleted)
default:
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
}
Expand Down Expand Up @@ -728,7 +729,8 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
}
case "delete":
s.state = stateDeleted
fallthrough
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
s.unsubscribeDirect(reserr.ErrDeleted)
default:
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
}
Expand Down Expand Up @@ -759,8 +761,16 @@ func (s *Subscription) handleReaccess(t *rescache.Throttle) {
func (s *Subscription) validateAccess(a *rescache.Access) {
err := a.CanGet()
if err != nil {
s.unsubscribeDirect(reserr.RESError(err))
}
}

// unsubscribeDirect removes any direct subscription of the resource and sends
// an unsubscribe event if any direct subscriptions existed.
func (s *Subscription) unsubscribeDirect(reason *reserr.Error) {
if s.direct > 0 {
s.c.Unsubscribe(s, true, s.direct, true)
s.c.Send(rpc.NewEvent(s.rid, "unsubscribe", rpc.UnsubscribeEvent{Reason: reserr.RESError(err)}))
s.c.Send(rpc.NewEvent(s.rid, "unsubscribe", rpc.UnsubscribeEvent{Reason: reason}))
}
}

Expand Down
3 changes: 1 addition & 2 deletions test/10reaccess_event_test.go
Expand Up @@ -58,7 +58,6 @@ func TestReaccessEventTriggersAccessCallOnSubscribedResources(t *testing.T) {
func TestReaccessEventTriggersUnsubscribeOnDeniedAccessCall(t *testing.T) {
runTest(t, func(s *Session) {
event := json.RawMessage(`{"foo":"bar"}`)
reasonAccessDenied := json.RawMessage(`{"reason":{"code":"system.accessDenied","message":"Access denied"}}`)

c := s.Connect()

Expand All @@ -72,7 +71,7 @@ func TestReaccessEventTriggersUnsubscribeOnDeniedAccessCall(t *testing.T) {
s.GetRequest(t).AssertSubject(t, "access.test.model.parent").RespondSuccess(json.RawMessage(`{"get":false}`))

// Validate unsubscribe events are sent to client
c.GetEvent(t).AssertEventName(t, "test.model.parent.unsubscribe").AssertData(t, reasonAccessDenied)
c.GetEvent(t).AssertEventName(t, "test.model.parent.unsubscribe").AssertData(t, mock.UnsubscribeReasonAccessDenied)

// Send event on model and validate client event
s.ResourceEvent("test.model", "custom", event)
Expand Down
6 changes: 4 additions & 2 deletions test/11system_event_test.go
Expand Up @@ -238,7 +238,8 @@ func TestSystemReset_NotFoundResponseOnModel_GeneratesDeleteEvent(t *testing.T)
// Respond to get request with system.notFound error
s.GetRequest(t).AssertSubject(t, "get.test.model").RespondError(reserr.ErrNotFound)
// Validate delete event is sent to client
c.GetEvent(t).AssertEventName(t, "test.model.delete").AssertData(t, nil)
c.GetEvent(t).Equals(t, "test.model.delete", nil)
c.GetEvent(t).Equals(t, "test.model.unsubscribe", mock.UnsubscribeReasonDeleted)
// Validate subsequent events are not sent to client
s.ResourceEvent("test.model", "custom", common.CustomEvent())
c.AssertNoEvent(t, "test.model")
Expand All @@ -255,7 +256,8 @@ func TestSystemReset_NotFoundResponseOnCollection_GeneratesDeleteEvent(t *testin
// Respond to get request with system.notFound error
s.GetRequest(t).AssertSubject(t, "get.test.collection").RespondError(reserr.ErrNotFound)
// Validate delete event is sent to client
c.GetEvent(t).AssertEventName(t, "test.collection.delete").AssertData(t, nil)
c.GetEvent(t).Equals(t, "test.collection.delete", nil)
c.GetEvent(t).Equals(t, "test.collection.unsubscribe", mock.UnsubscribeReasonDeleted)
// Validate subsequent events are not sent to client
s.ResourceEvent("test.collection", "custom", common.CustomEvent())
c.AssertNoEvent(t, "test.collection")
Expand Down
6 changes: 4 additions & 2 deletions test/13query_event_test.go
Expand Up @@ -696,7 +696,8 @@ func TestQueryEvent_DeleteEventOnModel_DeletesFromCache(t *testing.T) {
// Respond to query request with an error
s.GetRequest(t).RespondSuccess(json.RawMessage(`{"events":[{"event":"delete"},{"event":"change","data":{"values":{"string":"bar","int":-12}}}]}`))
// Validate only delete event is sent to client
c.GetEvent(t).AssertEventName(t, "test.model?q=foo&f=bar.delete").AssertData(t, nil)
c.GetEvent(t).Equals(t, "test.model?q=foo&f=bar.delete", nil)
c.GetEvent(t).Equals(t, "test.model?q=foo&f=bar.unsubscribe", mock.UnsubscribeReasonDeleted)
c.AssertNoEvent(t, "test.model")
// Validate subsequent query events does not send request
s.ResourceEvent("test.model", "query", json.RawMessage(`{"subject":"_EVENT_02_"}`))
Expand All @@ -713,7 +714,8 @@ func TestQueryEvent_DeleteEventOnCollection_DeletesFromCache(t *testing.T) {
// Respond to query request with an error
s.GetRequest(t).RespondSuccess(json.RawMessage(`{"events":[{"event":"delete"},{"event":"add","data":{"idx":1,"value":"bar"}}]}`))
// Validate only delete event is sent to client
c.GetEvent(t).AssertEventName(t, "test.collection?q=foo&f=bar.delete").AssertData(t, nil)
c.GetEvent(t).Equals(t, "test.collection?q=foo&f=bar.delete", nil)
c.GetEvent(t).Equals(t, "test.collection?q=foo&f=bar.unsubscribe", mock.UnsubscribeReasonDeleted)
c.AssertNoEvent(t, "test.collection")
// Validate subsequent query events does not send request
s.ResourceEvent("test.collection", "query", json.RawMessage(`{"subject":"_EVENT_02_"}`))
Expand Down
10 changes: 7 additions & 3 deletions test/29delete_event_test.go
Expand Up @@ -13,8 +13,9 @@ func TestDeleteEvent_OnModel_SentToClient(t *testing.T) {
// Send delete event
s.ResourceEvent("test.model", "delete", nil)

// Validate the delete event is sent to client
// Validate the delete and unsubscribe event is sent to client
c.GetEvent(t).Equals(t, "test.model.delete", nil)
c.GetEvent(t).Equals(t, "test.model.unsubscribe", mock.UnsubscribeReasonDeleted)
})
}

Expand All @@ -28,6 +29,7 @@ func TestDeleteEvent_OnCollection_SentToClient(t *testing.T) {

// Validate the delete event is sent to client
c.GetEvent(t).Equals(t, "test.collection.delete", nil)
c.GetEvent(t).Equals(t, "test.collection.unsubscribe", mock.UnsubscribeReasonDeleted)
})
}

Expand All @@ -38,6 +40,7 @@ func TestDeleteEvent_AndCustomEventOnModel_CustomEventNotSentToClient(t *testing
// Send delete event
s.ResourceEvent("test.model", "delete", nil)
c.GetEvent(t).Equals(t, "test.model.delete", nil)
c.GetEvent(t).Equals(t, "test.model.unsubscribe", mock.UnsubscribeReasonDeleted)
// Send custom event on model and validate no event
s.ResourceEvent("test.model", "custom", common.CustomEvent())
c.AssertNoEvent(t, "test.model")
Expand All @@ -51,6 +54,7 @@ func TestDeleteEvent_AndCustomEventOnCollection_CustomEventNotSentToClient(t *te
// Send delete event
s.ResourceEvent("test.collection", "delete", nil)
c.GetEvent(t).Equals(t, "test.collection.delete", nil)
c.GetEvent(t).Equals(t, "test.collection.unsubscribe", mock.UnsubscribeReasonDeleted)
// Send custom event on collection and validate no event
s.ResourceEvent("test.collection", "custom", common.CustomEvent())
c.AssertNoEvent(t, "test.collection")
Expand Down Expand Up @@ -91,6 +95,7 @@ func TestDeleteEvent_FollowedBySubscribe_IsNotCached(t *testing.T) {
s.ResourceEvent("test.model", "delete", nil)
// Validate the delete event is sent to client
c1.GetEvent(t).Equals(t, "test.model.delete", nil)
c1.GetEvent(t).Equals(t, "test.model.unsubscribe", mock.UnsubscribeReasonDeleted)

// Subscribe with second client
subscribeToTestModel(t, s, c2)
Expand All @@ -111,12 +116,11 @@ func TestDeleteEvent_FollowedByResubscribe_IsNotCached(t *testing.T) {
s.ResourceEvent("test.model", "delete", nil)
// Validate the delete event is sent to client
c.GetEvent(t).Equals(t, "test.model.delete", nil)
c.GetEvent(t).Equals(t, "test.model.unsubscribe", mock.UnsubscribeReasonDeleted)
// Send custom event and assert event not sent to client
s.ResourceEvent("test.model", "custom", common.CustomEvent())
c.AssertNoEvent(t, "test.model")
// Resubscribe
creq := c.Request("unsubscribe.test.model", nil)
creq.GetResponse(t)
subscribeToTestModel(t, s, c)
// Send custom event and assert event is sent to client
s.ResourceEvent("test.model", "custom", common.CustomEvent())
Expand Down
10 changes: 10 additions & 0 deletions test/resources.go
Expand Up @@ -6,6 +6,16 @@ import (
"github.com/resgateio/resgate/server/reserr"
)

type mockData struct {
UnsubscribeReasonAccessDenied json.RawMessage
UnsubscribeReasonDeleted json.RawMessage
}

var mock = mockData{
UnsubscribeReasonAccessDenied: json.RawMessage(`{"reason":{"code":"system.accessDenied","message":"Access denied"}}`),
UnsubscribeReasonDeleted: json.RawMessage(`{"reason":{"code":"system.deleted","message":"Deleted"}}`),
}

// The following cyclic groups exist
// a -> a

Expand Down

0 comments on commit e46be59

Please sign in to comment.