Skip to content

Commit

Permalink
Merge fed577d into 0b333c8
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Jun 10, 2020
2 parents 0b333c8 + fed577d commit 4b45ccb
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 70 deletions.
1 change: 1 addition & 0 deletions docs/res-client-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Code | Message | Meaning
`system.noSubscription` | No subscription | The resource has no direct subscription
`system.invalidRequest` | Invalid request | Invalid request
`system.unsupportedProtocol` | Unsupported protocol | RES protocol version is not supported
`system.unsupportedFeature` | Unsupported feature | Feature requires a client supporting a higher RES protocol version


# Requests
Expand Down
9 changes: 9 additions & 0 deletions server/apiEncoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ func (e *encoderJSON) encodeSubscription(s *Subscription, wrap bool) error {
}
}
e.b.WriteByte('}')

case rescache.TypeStatic:
if wrap {
e.b.Write([]byte(`,"static":`))
}
e.b.Write(s.Static())
}

// Remove itself from path
Expand Down Expand Up @@ -379,6 +385,9 @@ func (e *encoderJSONFlat) encodeSubscription(s *Subscription) error {
}
}
e.b.WriteByte('}')

case rescache.TypeStatic:
e.b.Write(s.Static())
}

// Remove itself from path
Expand Down
9 changes: 7 additions & 2 deletions server/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type GetResponse struct {
type GetResult struct {
Model map[string]Value `json:"model"`
Collection []Value `json:"collection"`
Static json.RawMessage `json:"static"`
Query string `json:"query"`
}

Expand Down Expand Up @@ -115,6 +116,7 @@ type EventQueryResult struct {
Events []*EventQueryEvent `json:"events"`
Model map[string]Value `json:"model"`
Collection []Value `json:"collection"`
Static json.RawMessage `json:"static"`
}

// EventQueryEvent represents an event in the response of a RES-server query request
Expand Down Expand Up @@ -329,7 +331,7 @@ func DecodeGetResponse(payload []byte) (*GetResult, error) {
// Assert we got either a model or a collection
res := r.Result
if res.Model != nil {
if res.Collection != nil {
if res.Collection != nil || res.Static != nil {
return nil, errInvalidResponse
}
// Assert model only has proper values
Expand All @@ -339,13 +341,16 @@ func DecodeGetResponse(payload []byte) (*GetResult, error) {
}
}
} else if res.Collection != nil {
if res.Static != nil {
return nil, errInvalidResponse
}
// Assert collection only has proper values
for _, v := range res.Collection {
if !v.IsProper() {
return nil, errInvalidResponse
}
}
} else {
} else if res.Static == nil {
return nil, errInvalidResponse
}

Expand Down
12 changes: 10 additions & 2 deletions server/rescache/eventSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ResourceType byte
const (
TypeCollection ResourceType = ResourceType(stateCollection)
TypeModel ResourceType = ResourceType(stateModel)
TypeStatic ResourceType = ResourceType(stateStatic)
TypeError ResourceType = ResourceType(stateError)
)

Expand Down Expand Up @@ -102,7 +103,7 @@ func (e *EventSubscription) addSubscriber(sub Subscriber) {
defer e.mu.Lock()
sub.Loaded(nil, rs.err)

// stateModel or stateCollection
// stateModel, stateCollection, or stateStatic
default:
e.mu.Unlock()
defer e.mu.Lock()
Expand Down Expand Up @@ -307,10 +308,17 @@ func (e *EventSubscription) handleQueryEvent(subj string, payload []byte) {
// Handle collection response
case result.Collection != nil:
if rs.state != stateCollection {
e.cache.Errorf("Error processing query event for %s?%s: non-model payload on model %s", e.ResourceName, rs.query, data)
e.cache.Errorf("Error processing query event for %s?%s: non-collection payload on collection %s", e.ResourceName, rs.query, data)
return
}
rs.processResetCollection(result.Collection)
// Handle static response
case result.Static != nil:
if rs.state != stateStatic {
e.cache.Errorf("Error processing query event for %s?%s: non-static payload on static %s", e.ResourceName, rs.query, data)
return
}
rs.processResetStatic(result.Static)
}
})
})
Expand Down
41 changes: 32 additions & 9 deletions server/rescache/resourceSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
stateRequested
stateCollection
stateModel
stateStatic
)

// Model represents a RES model
Expand Down Expand Up @@ -67,6 +68,7 @@ type ResourceSubscription struct {
// Three types of values stored
model *Model
collection *Collection
static json.RawMessage
err error
}

Expand Down Expand Up @@ -108,6 +110,14 @@ func (rs *ResourceSubscription) GetModel() *Model {
return rs.model
}

// GetStatic will lock the EventSubscription for any changes
// and return the static json.
// The lock must be released by calling Release
func (rs *ResourceSubscription) GetStatic() json.RawMessage {
rs.e.mu.Lock()
return rs.static
}

// Release releases the lock obtained by calling GetCollection or GetModel
func (rs *ResourceSubscription) Release() {
rs.e.mu.Unlock()
Expand Down Expand Up @@ -164,8 +174,8 @@ func (rs *ResourceSubscription) handleEvent(r *ResourceEvent) {
}

func (rs *ResourceSubscription) handleEventChange(r *ResourceEvent) bool {
if rs.state == stateCollection {
rs.e.cache.Errorf("Error processing event %s.%s: change event on collection", rs.e.ResourceName, r.Event)
if rs.state != stateModel {
rs.e.cache.Errorf("Error processing event %s.%s: change event on non-model", rs.e.ResourceName, r.Event)
return false
}

Expand Down Expand Up @@ -220,8 +230,8 @@ func (rs *ResourceSubscription) handleEventChange(r *ResourceEvent) bool {
}

func (rs *ResourceSubscription) handleEventAdd(r *ResourceEvent) bool {
if rs.state == stateModel {
rs.e.cache.Errorf("Error processing event %s.%s: add event on model", rs.e.ResourceName, r.Event)
if rs.state != stateCollection {
rs.e.cache.Errorf("Error processing event %s.%s: add event on non-collection", rs.e.ResourceName, r.Event)
return false
}

Expand Down Expand Up @@ -255,8 +265,8 @@ func (rs *ResourceSubscription) handleEventAdd(r *ResourceEvent) bool {
}

func (rs *ResourceSubscription) handleEventRemove(r *ResourceEvent) bool {
if rs.state == stateModel {
rs.e.cache.Errorf("Error processing event %s.%s: remove event on model", rs.e.ResourceName, r.Event)
if rs.state != stateCollection {
rs.e.cache.Errorf("Error processing event %s.%s: remove event on non-collection", rs.e.ResourceName, r.Event)
return false
}

Expand Down Expand Up @@ -413,12 +423,19 @@ func (rs *ResourceSubscription) processGetResponse(payload []byte, err error) (n
return
}

if result.Model != nil {
switch {
case result.Model != nil:
// Model
nrs.model = &Model{Values: result.Model}
nrs.state = stateModel
} else {
case result.Collection != nil:
// Collection
nrs.collection = &Collection{Values: result.Collection}
nrs.state = stateCollection
default:
// Static
nrs.static = result.Static
nrs.state = stateStatic
}
return
}
Expand Down Expand Up @@ -454,7 +471,7 @@ func (rs *ResourceSubscription) processResetGetResponse(payload []byte, err erro
// or an error in the service's response
if err == nil {
result, err = codec.DecodeGetResponse(payload)
if err == nil && ((rs.state == stateModel && result.Model == nil) || (rs.state == stateCollection && result.Collection == nil)) {
if err == nil && ((rs.state == stateModel && result.Model == nil) || (rs.state == stateCollection && result.Collection == nil) || (rs.state == stateStatic && result.Static == nil)) {
err = errors.New("mismatching resource type")
}
}
Expand All @@ -477,6 +494,8 @@ func (rs *ResourceSubscription) processResetGetResponse(payload []byte, err erro
rs.processResetModel(result.Model)
case stateCollection:
rs.processResetCollection(result.Collection)
case stateStatic:
rs.processResetStatic(result.Static)
}
}

Expand Down Expand Up @@ -517,6 +536,10 @@ func (rs *ResourceSubscription) processResetCollection(collection []codec.Value)
}
}

func (rs *ResourceSubscription) processResetStatic(static json.RawMessage) {
rs.static = static
}

func lcs(a, b []codec.Value) []*ResourceEvent {
var i, j int
// Do a LCS matric calculation
Expand Down
2 changes: 2 additions & 0 deletions server/reserr/reserr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
CodeTimeout = "system.timeout"
CodeInvalidRequest = "system.invalidRequest"
CodeUnsupportedProtocol = "system.unsupportedProtocol"
CodeUnsupportedFeature = "system.unsupportedFeature"
// HTTP only error codes
CodeBadRequest = "system.badRequest"
CodeMethodNotAllowed = "system.methodNotAllowed"
Expand All @@ -68,6 +69,7 @@ var (
ErrTimeout = &Error{Code: CodeTimeout, Message: "Request timeout"}
ErrInvalidRequest = &Error{Code: CodeInvalidRequest, Message: "Invalid request"}
ErrUnsupportedProtocol = &Error{Code: CodeUnsupportedProtocol, Message: "Unsupported protocol"}
ErrUnsupportedFeature = &Error{Code: CodeUnsupportedFeature, Message: "Unsupported feature"}
// HTTP only errors
ErrBadRequest = &Error{Code: CodeBadRequest, Message: "Bad request"}
ErrMethodNotAllowed = &Error{Code: CodeMethodNotAllowed, Message: "Method not allowed"}
Expand Down
1 change: 1 addition & 0 deletions server/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ErrorResponse struct {
type Resources struct {
Models map[string]interface{} `json:"models,omitempty"`
Collections map[string]interface{} `json:"collections,omitempty"`
Statics map[string]interface{} `json:"statics,omitempty"`
Errors map[string]*reserr.Error `json:"errors,omitempty"`
}

Expand Down
41 changes: 39 additions & 2 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Subscription struct {
typ rescache.ResourceType
model *rescache.Model
collection *rescache.Collection
static json.RawMessage
refs map[string]*reference
err error
queueFlag uint8
Expand Down Expand Up @@ -158,18 +159,23 @@ func (s *Subscription) Error() error {
return s.err
}

// ModelValues returns the subscriptions model values.
// ModelValues returns the subscription's model values.
// Panics if the subscription is not a loaded model.
func (s *Subscription) ModelValues() map[string]codec.Value {
return s.model.Values
}

// CollectionValues returns the subscriptions collection values.
// CollectionValues returns the subscription's collection values.
// Panics if the subscription is not a loaded collection.
func (s *Subscription) CollectionValues() []codec.Value {
return s.collection.Values
}

// Static returns the subscription's static data.
func (s *Subscription) Static() json.RawMessage {
return s.static
}

// Ref returns the referenced subscription, or nil if subscription has no such reference.
func (s *Subscription) Ref(rid string) *Subscription {
r := s.refs[rid]
Expand Down Expand Up @@ -201,6 +207,7 @@ func (s *Subscription) Loaded(resourceSub *rescache.ResourceSubscription, err er

s.setResource()
if s.err != nil {
resourceSub.Unsubscribe(s)
s.doneLoading()
return
}
Expand All @@ -225,6 +232,12 @@ func (s *Subscription) setResource() {
s.setCollection()
case rescache.TypeModel:
s.setModel()
case rescache.TypeStatic:
if s.c.ProtocolVersion() <= versionStaticResource {
s.err = reserr.ErrUnsupportedFeature
} else {
s.setStatic()
}
default:
err := fmt.Errorf("subscription %s: unknown resource type", s.rid)
s.c.Errorf("Error loading %s", err)
Expand Down Expand Up @@ -354,6 +367,13 @@ func (s *Subscription) populateResources(r *rpc.Resources) {
r.Models = make(map[string]interface{})
}
r.Models[s.rid] = s.model

case rescache.TypeStatic:
// Create Statics map if needed
if r.Statics == nil {
r.Statics = make(map[string]interface{})
}
r.Statics[s.rid] = s.static
}

s.state = stateToSend
Expand Down Expand Up @@ -396,6 +416,13 @@ func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) {
r.Models = make(map[string]interface{})
}
r.Models[s.rid] = (*rescache.Legacy120Model)(s.model)

case rescache.TypeStatic:
// Create Statics map if needed
if r.Statics == nil {
r.Statics = make(map[string]interface{})
}
r.Statics[s.rid] = s.static
}

s.state = stateToSend
Expand Down Expand Up @@ -431,6 +458,14 @@ func (s *Subscription) setCollection() {
s.collection = c
}

// setStatic gets the static json from the resourceSubscription and stores it.
func (s *Subscription) setStatic() {
st := s.resourceSub.GetStatic()
s.queueEvents(queueReasonLoading)
s.resourceSub.Release()
s.static = st
}

// subscribeRef subscribes to any resource reference value
// and adds it to s.refs.
// If an error is encountered, all subscriptions in s.refs will
Expand Down Expand Up @@ -562,6 +597,8 @@ func (s *Subscription) processEvent(event *rescache.ResourceEvent) {
s.processCollectionEvent(event)
case rescache.TypeModel:
s.processModelEvent(event)
case rescache.TypeStatic:
s.c.Send(rpc.NewEvent(s.rid, event.Event, event.Payload))
default:
s.c.Errorf("Subscription %s: Unknown resource type: %d", s.rid, s.resourceSub.GetResourceType())
}
Expand Down
1 change: 1 addition & 0 deletions server/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package server
const (
versionCallResourceResponse = 1001001
versionSoftResourceReference = 1002000
versionStaticResource = 1002000
)
Loading

0 comments on commit 4b45ccb

Please sign in to comment.