Skip to content

Commit

Permalink
Merge pull request #18 from jirenius/feature/gh-17-query-and-group-su…
Browse files Browse the repository at this point in the history
…pport

Feature/gh 17 query and group support
  • Loading branch information
jirenius committed Mar 29, 2019
2 parents f0b516d + 8fafa12 commit 6645c02
Show file tree
Hide file tree
Showing 16 changed files with 1,008 additions and 238 deletions.
17 changes: 17 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,20 @@ type addEvent struct {
type removeEvent struct {
Idx int `json:"idx"`
}

type resQueryEvent struct {
Subject string `json:"subject"`
}

type resQueryRequest struct {
Query string `json:"query"`
}

type resEvent struct {
Event string `json:"event"`
Data interface{} `json:"data"`
}

type queryResponse struct {
Events []resEvent `json:"events"`
}
3 changes: 0 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
CodeInternalError = "system.internalError"
CodeInvalidParams = "system.invalidParams"
CodeMethodNotFound = "system.methodNotFound"
CodeNoSubscription = "system.noSubscription"
CodeNotFound = "system.notFound"
CodeTimeout = "system.timeout"
CodeBadRequest = "system.badRequest"
Expand All @@ -41,11 +40,9 @@ const (
// Predefined errors
var (
ErrAccessDenied = &Error{Code: CodeAccessDenied, Message: "Access denied"}
ErrDisposing = &Error{Code: CodeInternalError, Message: "Internal error: disposing connection"}
ErrInternalError = &Error{Code: CodeInternalError, Message: "Internal error"}
ErrInvalidParams = &Error{Code: CodeInvalidParams, Message: "Invalid parameters"}
ErrMethodNotFound = &Error{Code: CodeMethodNotFound, Message: "Method not found"}
ErrNoSubscription = &Error{Code: CodeNoSubscription, Message: "No subscription"}
ErrNotFound = &Error{Code: CodeNotFound, Message: "Not found"}
ErrTimeout = &Error{Code: CodeTimeout, Message: "Request timeout"}
)
202 changes: 202 additions & 0 deletions queryevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package res

import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

nats "github.com/nats-io/go-nats"
)

const queryEventChannelSize = 10

// QueryRequest has methods for responding to query requests.
type QueryRequest interface {
Resource
NotFound()
Error(err *Error)
Timeout(d time.Duration)
}

type queryRequest struct {
resource
msg *nats.Msg
events []resEvent
err *Error
replied bool // Flag telling if a reply has been made
}

type queryEvent struct {
r resource
sub *nats.Subscription
ch chan *nats.Msg
cb func(r QueryRequest)
}

// ChangeEvent adds a change event to the query response
// If ev is empty, no event is added.
func (qr *queryRequest) ChangeEvent(ev map[string]interface{}) {
if len(ev) == 0 {
return
}
qr.events = append(qr.events, resEvent{Event: "change", Data: changeEvent{Values: ev}})
}

// AddEvent adds an add event to the query response,
// adding the value v at index idx.
func (qr *queryRequest) AddEvent(v interface{}, idx int) {
if idx < 0 {
panic("res: add event idx less than zero")
}
qr.events = append(qr.events, resEvent{Event: "add", Data: addEvent{Value: v, Idx: idx}})
}

// RemoveEvent adds a remove event to the query response,
// removing the value at index idx.
func (qr *queryRequest) RemoveEvent(idx int) {
if idx < 0 {
panic("res: remove event idx less than zero")
}
qr.events = append(qr.events, resEvent{Event: "remove", Data: removeEvent{Idx: idx}})
}

// NotFound sends a system.notFound response for the query request.
func (qr *queryRequest) NotFound() {
qr.reply(responseNotFound)
}

// Error sends a custom error response for the query request.
func (qr *queryRequest) Error(err *Error) {
qr.error(err)
}

// Timeout attempts to set the timeout duration of the query request.
// The call has no effect if the requester has already timed out the request.
func (qr *queryRequest) Timeout(d time.Duration) {
if d < 0 {
panic("res: negative timeout duration")
}
out := []byte(`timeout:"` + strconv.FormatInt(d.Nanoseconds()/1000000, 10) + `"`)
qr.s.rawEvent(qr.msg.Reply, out)
}

// startQueryListener listens for query requests and passes them on to a worker.
func (qe *queryEvent) startQueryListener() {
for m := range qe.ch {
qe.r.s.runWith(qe.r.hs, qe.r.rname, func() {
qe.handleQueryRequest(m)
})
}
}

// handleRequest is called by the query listener on incoming query requests.
func (qe *queryEvent) handleQueryRequest(m *nats.Msg) {
s := qe.r.s
s.Tracef("Q=> %s: %s", qe.r.rname, m.Data)

qr := &queryRequest{
resource: qe.r,
msg: m,
}

var rqr resQueryRequest
err := json.Unmarshal(m.Data, &rqr)
if err != nil {
s.Logf("error unmarshaling incoming query request: %s", err)
qr.error(ToError(err))
return
}

if rqr.Query == "" {
s.Logf("missing query on incoming query request: %s", err)
qr.reply(responseMissingQuery)
return
}

qr.query = rqr.Query

qr.executeCallback(qe.cb)
if qr.replied {
return
}

var data []byte
if len(qr.events) == 0 {
data = responseNoQueryEvents
} else {
data, err = json.Marshal(successResponse{Result: queryResponse{Events: qr.events}})
if err != nil {
data = responseInternalError
}
}
qr.reply(data)
}

func (qr *queryRequest) executeCallback(cb func(QueryRequest)) {
// Recover from panics inside query event callback
defer func() {
v := recover()
if v == nil {
return
}

var str string

switch e := v.(type) {
case *Error:
if !qr.replied {
qr.error(e)
// Return without logging, as panicing with an *Error is considered
// a valid way of sending an error response.
return
}
str = e.Message
case error:
str = e.Error()
if !qr.replied {
qr.error(ToError(e))
}
case string:
str = e
if !qr.replied {
qr.error(ToError(errors.New(e)))
}
default:
str = fmt.Sprintf("%v", e)
if !qr.replied {
qr.error(ToError(errors.New(str)))
}
}

qr.s.Logf("error handling query request %s: %s", qr.rname, str)
}()

cb(qr)
}

// error sends an error response as a reply.
func (qr *queryRequest) error(e *Error) {
data, err := json.Marshal(errorResponse{Error: e})
if err != nil {
data = responseInternalError
}
qr.reply(data)
}

// reply sends an encoded payload to as a reply.
// If a reply is already sent, reply will log an error.
func (qr *queryRequest) reply(payload []byte) {
if qr.replied {
qr.s.Logf("response already sent on query request %s", qr.rname)
return
}
qr.replied = true

qr.s.Tracef("<=Q %s: %s", qr.rname, payload)
err := qr.s.nc.Publish(qr.msg.Reply, payload)
if err != nil {
qr.s.Logf("error sending query reply %s: %s", qr.rname, err)
}
}
2 changes: 2 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ var (
responseMethodNotFound = []byte(`{"error":{"code":"system.methodNotFound","message":"Method not found"}}`)
responseInvalidParams = []byte(`{"error":{"code":"system.invalidParams","message":"Invalid parameters"}}`)
responseMissingResponse = []byte(`{"error":{"code":"system.internalError","message":"Internal error: missing response"}}`)
responseMissingQuery = []byte(`{"error":{"code":"system.internalError","message":"Internal error: missing query"}}`)
responseAccessGranted = []byte(`{"result":{"get":true,"call":"*"}}`)
responseNoQueryEvents = []byte(`{"result":{"events":[]}}`)
)

// Predefined handlers
Expand Down
36 changes: 35 additions & 1 deletion resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package res

import (
"net/url"

nats "github.com/nats-io/go-nats"
)

// Resource represents a resource
Expand Down Expand Up @@ -70,6 +72,8 @@ type Resource interface {
// See the protocol specification for more information:
// https://github.com/jirenius/resgate/blob/master/docs/res-service-protocol.md#reaccess-event
ReaccessEvent()

QueryEvent(func(QueryRequest))
}

// resource is the internal implementation of the Resource interface
Expand Down Expand Up @@ -145,7 +149,7 @@ func isValidPart(p string) bool {

// Event sends a custom event on the resource.
// Will panic if the event is one of the pre-defined or reserved events,
// "change", "delete", "add", "remove", "patch", "reaccess", or "unsubscribe".
// "change", "delete", "add", "remove", "patch", "reaccess", "unsubscribe", or "query".
// For pre-defined events, the matching method, ChangeEvent, AddEvent,
// RemoveEvent, or ReaccessEvent should be used instead.
//
Expand All @@ -167,6 +171,8 @@ func (r *resource) Event(event string, payload interface{}) {
panic("res: use ReaccessEvent to send a reaccess event")
case "unsubscribe":
panic(`res: "unsubscribe" is a reserved event name`)
case "query":
panic(`res: "query" is a reserved event name`)
}

if !isValidPart(event) {
Expand Down Expand Up @@ -217,3 +223,31 @@ func (r *resource) RemoveEvent(idx int) {
func (r *resource) ReaccessEvent() {
r.s.rawEvent("event."+r.rname+".reaccess", nil)
}

// QueryEvent sends a query event on the resource, calling the
// provided callback on any query request.
// The last call to the callback will always be with nil, indicating
// that the query event duration has expired.
func (r *resource) QueryEvent(cb func(QueryRequest)) {
qsubj := nats.NewInbox()
ch := make(chan *nats.Msg, queryEventChannelSize)
sub, err := r.s.nc.ChanSubscribe(qsubj, ch)
if err != nil {
cb(nil)
r.s.Logf("Failed to subscribe to query event: %s", err)
return
}

qe := &queryEvent{
r: *r,
sub: sub,
ch: ch,
cb: cb,
}

r.s.event("event."+r.rname+".query", resQueryEvent{Subject: qsubj})

go qe.startQueryListener()

r.s.queryTQ.Add(qe)
}

0 comments on commit 6645c02

Please sign in to comment.