Skip to content

Commit

Permalink
Merge 45204d4 into 41fa084
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Oct 29, 2019
2 parents 41fa084 + 45204d4 commit 08c317a
Show file tree
Hide file tree
Showing 17 changed files with 482 additions and 58 deletions.
24 changes: 24 additions & 0 deletions docs/CHANGELOG.md
@@ -0,0 +1,24 @@
# Changelog for RES Protocol

All changes to the RES Protocol will be documented in this file.

## [Unreleased]

* #127 Resource response on query request.
* #83 Added delete resource event.
* #132 Added create resource event.

## v1.1.1 - [Resgate v1.3.0](compare/v1.2.2...v1.3.0) - 2019-10-02

* #110 Allow query on non-query requests.
* #111 Added *Invalid Query* error.
* #113 Added [RES Protocol Semantic Versioning](blob/v1.3.0/docs/res-protocol-semver.md).

## v1.1.0 - [Resgate v1.2.1](compare/v1.2.0...v1.2.1) - 2019-08-05
See [v1.1 update page](docs/res-protocol-v1.1-update.md) for more info.

* #68 Props field on change event.

## v1.0.0 - [Resgate v1.0.0](tree/v1.0.0) - 2018-09-22

* Initial release.
12 changes: 11 additions & 1 deletion docs/res-client-protocol.md
Expand Up @@ -460,7 +460,8 @@ Zero-based index number of the value being removed.

## Custom event

Custom events are defined by the services, and may have any event name except `change`, `add`, `remove`, `unsubscribe` and `reaccess`.
Custom events are defined by the services, and may have any event name except the following:
`add`, `change`, `create`, `delete`, `patch`, `reset`, `reaccess`, `remove` or `unsubscribe`.
Custom events MUST NOT be used to change the state of the resource.

**event**
Expand Down Expand Up @@ -499,3 +500,12 @@ The unsubscribe event object has the following parameter:
}
}
```

## Delete event

Delete events are sent to the client when the service considers the resource deleted.
The resource is still to be considered subscribed, but the client will not receive any more events on the resource.
The event has no payload.

**event**
`<resourceID>.delete`
22 changes: 20 additions & 2 deletions docs/res-service-protocol.md
Expand Up @@ -427,7 +427,25 @@ MUST be a number that is zero or greater and less than the length of the collect
**Subject**
`event.<resourceName>.reaccess`

Reaccess events are sent when a resource's access permissions has changed. It will invalidate any previous access response received for the resource.
Reaccess events are sent when a resource's access permissions has changed.
It will invalidate any previous access response received for the resource.
The event has no payload.

## Create event

**Subject**
`event.<resourceName>.create`

Create events are sent when the resource is created.
The event has no payload.

## Delete event

**Subject**
`event.<resourceName>.delete`

Delete events are sent when the resource is considered deleted.
It will invalidate any previous get response received for the resource.
The event has no payload.

## Custom event
Expand All @@ -437,7 +455,7 @@ The event has no payload.

Custom events are used to send information that does not affect the state of the resource.
The event name is case-sensitive and MUST be a non-empty alphanumeric string with no embedded whitespace. It MUST NOT be any of the following reserved event names:
`change`, `delete`, `add`, `remove`, `patch`, `reaccess` or `unsubscribe`.
`add`, `change`, `create`, `delete`, `patch`, `reset`, `reaccess`, `remove` or `unsubscribe`.


Payload is defined by the service, and will be passed to the client without alteration.
Expand Down
16 changes: 5 additions & 11 deletions server/rescache/deprecated.go
Expand Up @@ -2,7 +2,6 @@ package rescache

import (
"strings"
"sync"
)

type featureType int
Expand All @@ -12,11 +11,6 @@ const (
deprecatedModelChangeEvent featureType = 1 << iota
)

var (
depMutex sync.Mutex
depLogged = make(map[string]featureType)
)

// deprecated logs a deprecated error for each unique service name and feature
func (c *Cache) deprecated(rid string, typ featureType) {
// Get service name
Expand All @@ -26,10 +20,10 @@ func (c *Cache) deprecated(rid string, typ featureType) {
name = rid[:idx]
}

depMutex.Lock()
defer depMutex.Unlock()
c.depMutex.Lock()
defer c.depMutex.Unlock()

s := depLogged[name]
s := c.depLogged[name]
if (s & typ) != 0 {
// Already logged
return
Expand All @@ -44,6 +38,6 @@ func (c *Cache) deprecated(rid string, typ featureType) {
return
}

depLogged[name] = s | typ
c.Errorf("Deprecated warning for service [%s] - %s", name, msg)
c.depLogged[name] = s | typ
c.Errorf("Deprecation warning for service [%s] - %s", name, msg)
}
18 changes: 15 additions & 3 deletions server/rescache/eventSubscription.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/resgateio/resgate/server/codec"
"github.com/resgateio/resgate/server/mq"
"github.com/resgateio/resgate/server/reserr"
)

// ResourceType is an enum representing a resource type
Expand All @@ -23,15 +24,17 @@ type EventSubscription struct {
ResourceName string
cache *Cache

// Protected by cache mutex
mqSub mq.Unsubscriber
count int64

// Protected by single goroutine
mqSub mq.Unsubscriber
base *ResourceSubscription
queries map[string]*ResourceSubscription
links map[string]*ResourceSubscription

// Mutex protected
mu sync.Mutex
count int64
queue []func()
locks []func()
}
Expand Down Expand Up @@ -201,6 +204,8 @@ func (e *EventSubscription) addCount() {
e.count++
}

// removeCount decreases the subscription count, and puts the event subscription
// in the unsubscribe queue if count reaches zero.
func (e *EventSubscription) removeCount(n int64) {
e.count -= n
if e.count == 0 {
Expand Down Expand Up @@ -275,7 +280,14 @@ func (e *EventSubscription) handleQueryEvent(subj string, payload []byte) {

result, err := codec.DecodeEventQueryResponse(data)
if err != nil {
e.cache.Errorf("Error processing query event for %s?%s: malformed payload %s", e.ResourceName, rs.query, data)
// In case of a system.notFound error,
// a delete event is generated. Otherwise we
// just log the error.
if reserr.IsError(err, reserr.CodeNotFound) {
rs.handleEvent(&ResourceEvent{Event: "delete"})
} else {
e.cache.Errorf("Error processing query event for %s?%s: %s", e.ResourceName, rs.query, err)
}
return
}

Expand Down
13 changes: 9 additions & 4 deletions server/rescache/rescache.go
Expand Up @@ -21,12 +21,16 @@ type Cache struct {
workers int
unsubscribeDelay time.Duration

mu sync.Mutex
started bool
eventSubs map[string]*EventSubscription
inCh chan *EventSubscription
unsubQueue *timerqueue.Queue
resetSub mq.Unsubscriber
mu sync.Mutex

// Deprecated behavior logging
depMutex sync.Mutex
depLogged map[string]featureType
}

// Subscriber interface represents a subscription made on a client connection
Expand Down Expand Up @@ -56,6 +60,7 @@ func NewCache(mq mq.Client, workers int, unsubscribeDelay time.Duration, l logge
logger: l,
workers: workers,
unsubscribeDelay: unsubscribeDelay,
depLogged: make(map[string]featureType),
}
}

Expand Down Expand Up @@ -250,15 +255,15 @@ func (c *Cache) mqUnsubscribe(v interface{}) {
}

func (c *Cache) handleSystemReset(payload []byte) {
c.mu.Lock()
defer c.mu.Unlock()

r, err := codec.DecodeSystemReset(payload)
if err != nil {
c.Errorf("Error decoding system reset: %s", err)
return
}

c.mu.Lock()
defer c.mu.Unlock()

c.forEachMatch(r.Resources, func(e *EventSubscription) {
e.handleResetResource()
})
Expand Down
33 changes: 32 additions & 1 deletion server/rescache/resourceSubscription.go
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"

"github.com/resgateio/resgate/server/codec"
"github.com/resgateio/resgate/server/reserr"
)

type subscriptionState byte
Expand Down Expand Up @@ -118,6 +119,7 @@ func (rs *ResourceSubscription) Unsubscribe(sub Subscriber) {
delete(rs.subs, sub)
}

// Directly unregister unsubscribed queries
if rs.query != "" && len(rs.subs) == 0 {
rs.unregister()
}
Expand Down Expand Up @@ -146,6 +148,11 @@ func (rs *ResourceSubscription) handleEvent(r *ResourceEvent) {
if rs.resetting || !rs.handleEventRemove(r) {
return
}
case "delete":
if !rs.resetting {
rs.handleEventDelete(r)
}
return
}

rs.e.mu.Unlock()
Expand Down Expand Up @@ -279,6 +286,20 @@ func (rs *ResourceSubscription) handleEventRemove(r *ResourceEvent) bool {
return true
}

func (rs *ResourceSubscription) handleEventDelete(r *ResourceEvent) {
subs := rs.subs
c := int64(len(subs))
rs.subs = nil
rs.unregister()
rs.e.removeCount(c)

rs.e.mu.Unlock()
for sub := range subs {
sub.Event(r)
}
rs.e.mu.Lock()
}

func (rs *ResourceSubscription) enqueueGetResponse(data []byte, err error) {
rs.e.Enqueue(func() {
rs, sublist := rs.processGetResponse(data, err)
Expand Down Expand Up @@ -436,7 +457,17 @@ func (rs *ResourceSubscription) processResetGetResponse(payload []byte, err erro

// Get request failed
if err != nil {
rs.e.cache.Errorf("Subscription %s: Reset get error - %s", rs.e.ResourceName, err)
// In case of a system.notFound error,
// a delete event is generated. Otherwise we
// just log the error.
if reserr.IsError(err, reserr.CodeNotFound) {
r := &ResourceEvent{
Event: "delete",
}
rs.handleEvent(r)
} else {
rs.e.cache.Errorf("Subscription %s: Reset get error - %s", rs.e.ResourceName, err)
}
return
}

Expand Down
9 changes: 9 additions & 0 deletions server/reserr/reserr.go
Expand Up @@ -25,6 +25,15 @@ func InternalError(err error) *Error {
return &Error{Code: CodeInternalError, Message: "Internal error: " + err.Error()}
}

// IsError returns true if the error is an Error with the given error code.
func IsError(err error, code string) bool {
rerr, ok := err.(*Error)
if !ok {
return false
}
return rerr.Code == code
}

// Pre-defined RES error codes
const (
CodeAccessDenied = "system.accessDenied"
Expand Down
13 changes: 11 additions & 2 deletions server/subscription.go
Expand Up @@ -75,6 +75,7 @@ const (
stateReady
stateToSend
stateSent
stateDeleted
)

const (
Expand Down Expand Up @@ -570,6 +571,9 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) {
}
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))

case "delete":
s.state = stateDeleted
fallthrough
default:
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
}
Expand Down Expand Up @@ -640,7 +644,9 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
s.unqueueEvents(queueReasonLoading)
})
}

case "delete":
s.state = stateDeleted
fallthrough
default:
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
}
Expand Down Expand Up @@ -683,13 +689,16 @@ func (s *Subscription) Dispose() {
return
}

state := s.state
s.state = stateDisposed
s.readyCallbacks = nil
s.eventQueue = nil

if s.resourceSub != nil {
s.unsubscribeRefs()
s.resourceSub.Unsubscribe(s)
if state != stateDeleted {
s.resourceSub.Unsubscribe(s)
}
s.resourceSub = nil
}
}
Expand Down

0 comments on commit 08c317a

Please sign in to comment.