Skip to content

Commit

Permalink
Send token and CID on all requests
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelpereira committed May 16, 2020
1 parent 2d5c9ca commit 2aa2e38
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -16,3 +16,4 @@

# Editor specifics
.vscode
.idea
6 changes: 3 additions & 3 deletions go.mod
@@ -1,13 +1,13 @@
module github.com/resgateio/resgate

go 1.13
go 1.14

require (
github.com/gorilla/websocket v1.4.2
github.com/jirenius/timerqueue v1.0.0
github.com/nats-io/nats-server/v2 v2.1.4 // indirect
github.com/nats-io/nats.go v1.9.1
github.com/nats-io/nats.go v1.10.0
github.com/posener/wstest v1.2.0
github.com/rs/xid v1.2.1
golang.org/x/crypto v0.0.0-20200317142112-1b76d66859c6 // indirect
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Expand Up @@ -15,10 +15,14 @@ github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8p
github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -35,6 +39,10 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49N
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200317142112-1b76d66859c6 h1:TjszyFsQsyZNHwdVdZ5m7bjmreu0znc2kRYsEml9/Ww=
golang.org/x/crypto v0.0.0-20200317142112-1b76d66859c6/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 h1:cg5LA/zNPRzIXIWSCxQW10Rvpy94aQh3LT/ShoCpkHw=
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
10 changes: 2 additions & 8 deletions server/codec/codec.go
Expand Up @@ -50,12 +50,6 @@ type AccessResult struct {
Call string `json:"call"`
}

// GetRequest represents a RES-service get request
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#get-request
type GetRequest struct {
Query string `json:"query,omitempty"`
}

// GetResponse represents the response of a RES-service get request
type GetResponse struct {
Result *GetResult `json:"result"`
Expand Down Expand Up @@ -275,11 +269,11 @@ func CreateRequest(params interface{}, r Requester, query string, token interfac
}

// CreateGetRequest creates a JSON encoded RES-service get request
func CreateGetRequest(query string) []byte {
func CreateGetRequest(query string, token interface{}, cid string) []byte {
if query == "" {
return noQueryGetRequest
}
out, _ := json.Marshal(GetRequest{Query: query})
out, _ := json.Marshal(Request{Query: query, Token: token, CID: cid})
return out
}

Expand Down
14 changes: 7 additions & 7 deletions server/rescache/eventSubscription.go
Expand Up @@ -39,17 +39,17 @@ type EventSubscription struct {
locks []func()
}

func (e *EventSubscription) getResourceSubscription(q string) (rs *ResourceSubscription) {
func (e *EventSubscription) getResourceSubscription(q string, t interface{}, c string) (rs *ResourceSubscription) {
if q == "" {
rs = e.base
if rs == nil {
rs = newResourceSubscription(e, "")
rs = newResourceSubscription(e, "", t, c)
e.base = rs
}
} else {
if e.queries == nil {
e.queries = make(map[string]*ResourceSubscription)
rs = newResourceSubscription(e, q)
rs = newResourceSubscription(e, q, t, c)
e.queries[q] = rs
} else {
rs = e.queries[q]
Expand All @@ -58,19 +58,19 @@ func (e *EventSubscription) getResourceSubscription(q string) (rs *ResourceSubsc
}

if rs == nil {
rs = newResourceSubscription(e, q)
rs = newResourceSubscription(e, q, t, c)
e.queries[q] = rs
}
}
}
return
}

func (e *EventSubscription) addSubscriber(sub Subscriber) {
func (e *EventSubscription) addSubscriber(sub Subscriber, token interface{}) {
e.Enqueue(func() {
var rs *ResourceSubscription
q := sub.ResourceQuery()
rs = e.getResourceSubscription(q)
rs = e.getResourceSubscription(q, token, sub.CID())

if rs.state != stateError {
rs.subs[sub] = struct{}{}
Expand All @@ -84,7 +84,7 @@ func (e *EventSubscription) addSubscriber(sub Subscriber) {
rs.state = stateRequested
// Create request
subj := "get." + e.ResourceName
payload := codec.CreateGetRequest(q)
payload := codec.CreateGetRequest(q, token, sub.CID())
e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.enqueueGetResponse(data, err)
})
Expand Down
4 changes: 2 additions & 2 deletions server/rescache/rescache.go
Expand Up @@ -113,14 +113,14 @@ func (c *Cache) Errorf(format string, v ...interface{}) {

// Subscribe fetches a resource from the cache, and if it is
// not cached, starts subscribing to the resource and sends a get request
func (c *Cache) Subscribe(sub Subscriber) {
func (c *Cache) Subscribe(sub Subscriber, token interface{}) {
eventSub, err := c.getSubscription(sub.ResourceName(), true)
if err != nil {
sub.Loaded(nil, err)
return
}

eventSub.addSubscriber(sub)
eventSub.addSubscriber(sub, token)
}

// Access sends an access request
Expand Down
10 changes: 7 additions & 3 deletions server/rescache/resourceSubscription.go
Expand Up @@ -63,16 +63,20 @@ type ResourceSubscription struct {
subs map[Subscriber]struct{}
resetting bool
links []string
cid string
token interface{}
// Three types of values stored
model *Model
collection *Collection
err error
}

func newResourceSubscription(e *EventSubscription, query string) *ResourceSubscription {
func newResourceSubscription(e *EventSubscription, query string, token interface{}, cid string) *ResourceSubscription {
return &ResourceSubscription{
e: e,
query: query,
token: token,
cid: cid,
subs: make(map[Subscriber]struct{}),
}
}
Expand Down Expand Up @@ -372,7 +376,7 @@ func (rs *ResourceSubscription) processGetResponse(payload []byte, err error) (n
// one requested by the Subscriber?
// Then we should create a link to the normalized query
if result.Query != rs.query {
nrs = rs.e.getResourceSubscription(result.Query)
nrs = rs.e.getResourceSubscription(result.Query, rs.token, rs.cid)
if rs.query == "" {
rs.e.base = nrs
} else {
Expand Down Expand Up @@ -432,7 +436,7 @@ func (rs *ResourceSubscription) handleResetResource() {

// Create request
subj := "get." + rs.e.ResourceName
payload := codec.CreateGetRequest(rs.query)
payload := codec.CreateGetRequest(rs.query, rs.token, rs.cid)
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.e.Enqueue(func() {
rs.resetting = false
Expand Down
2 changes: 1 addition & 1 deletion server/wsConn.go
Expand Up @@ -481,7 +481,7 @@ func (c *wsConn) subscribe(rid string, direct bool) (*Subscription, error) {

sub = NewSubscription(c, rid)
_ = c.addCount(sub, direct)
c.serv.cache.Subscribe(sub)
c.serv.cache.Subscribe(sub, c.token)

c.subs[rid] = sub
return sub, nil
Expand Down

0 comments on commit 2aa2e38

Please sign in to comment.