Skip to content

Commit

Permalink
add subscription support for invoke event (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed Jan 30, 2018
1 parent 34492df commit b970e42
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 31 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ the data block is base64 encoded.

#### Invoke Event

`invoke` is a built-in type of event allowing to call functions synchronously.
`invoke` is a built-in event type allowing synchronous invocations. Function will react to this event only if there is a
subscription created beforehand.

### Emit a Custom Event

Expand Down
77 changes: 55 additions & 22 deletions internal/cache/subscription_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ type subscriptionCache struct {
eventToFunctions map[string]map[event.Type][]functions.FunctionID
// endpoints maps HTTP method to internal/pathtree. Tree struct which is used for resolving HTTP requests paths.
endpoints map[string]*pathtree.Node
// invokable stores functions that have invoke subscription
invokable map[string]map[functions.FunctionID]struct{}
log *zap.Logger
}

func newSubscriptionCache(log *zap.Logger) *subscriptionCache {
return &subscriptionCache{
eventToFunctions: map[string]map[event.Type][]functions.FunctionID{},
endpoints: map[string]*pathtree.Node{},
invokable: map[string]map[functions.FunctionID]struct{}{},
log: log,
}
}
Expand All @@ -50,6 +53,15 @@ func (c *subscriptionCache) Modified(k string, v []byte) {
if err != nil {
c.log.Error("Could not add path to the tree.", zap.Error(err), zap.String("path", s.Path), zap.String("method", s.Method))
}
} else if s.Event == event.TypeInvoke {
fnSet, exists := c.invokable[s.Path]
if exists {
fnSet[s.FunctionID] = struct{}{}
} else {
fnSet := map[functions.FunctionID]struct{}{}
fnSet[s.FunctionID] = struct{}{}
c.invokable[s.Path] = fnSet
}
} else {
c.createPath(s.Path)
ids, exists := c.eventToFunctions[s.Path][s.Event]
Expand All @@ -75,29 +87,11 @@ func (c *subscriptionCache) Deleted(k string, v []byte) {
}

if oldSub.Event == event.TypeHTTP {
root := c.endpoints[oldSub.Method]
if root == nil {
return
}
err := root.DeleteRoute(oldSub.Path)
if err != nil {
c.log.Error("Could not delete path from the tree.", zap.Error(err), zap.String("path", oldSub.Path), zap.String("method", oldSub.Method))
}
c.deleteEndpoint(oldSub)
} else if oldSub.Event == event.TypeInvoke {
c.deleteInvokable(oldSub)
} else {
ids, exists := c.eventToFunctions[oldSub.Path][oldSub.Event]
if exists {
for i, id := range ids {
if id == oldSub.FunctionID {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
c.eventToFunctions[oldSub.Path][oldSub.Event] = ids

if len(ids) == 0 {
delete(c.eventToFunctions[oldSub.Path], oldSub.Event)
}
}
c.deleteSubscription(oldSub)
}
}

Expand All @@ -107,3 +101,42 @@ func (c *subscriptionCache) createPath(path string) {
c.eventToFunctions[path] = map[event.Type][]functions.FunctionID{}
}
}

func (c *subscriptionCache) deleteEndpoint(sub subscriptions.Subscription) {
root := c.endpoints[sub.Method]
if root == nil {
return
}
err := root.DeleteRoute(sub.Path)
if err != nil {
c.log.Error("Could not delete path from the tree.", zap.Error(err), zap.String("path", sub.Path), zap.String("method", sub.Method))
}
}

func (c *subscriptionCache) deleteInvokable(sub subscriptions.Subscription) {
fnSet, exists := c.invokable[sub.Path]
if exists {
delete(fnSet, sub.FunctionID)

if len(fnSet) == 0 {
delete(c.invokable, sub.Path)
}
}
}

func (c *subscriptionCache) deleteSubscription(sub subscriptions.Subscription) {
ids, exists := c.eventToFunctions[sub.Path][sub.Event]
if exists {
for i, id := range ids {
if id == sub.FunctionID {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
c.eventToFunctions[sub.Path][sub.Event] = ids

if len(ids) == 0 {
delete(c.eventToFunctions[sub.Path], sub.Event)
}
}
}
32 changes: 27 additions & 5 deletions internal/cache/subscription_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
)

func TestSubscriptionCacheModified(t *testing.T) {
func TestSubscriptionCacheModifiedEvents(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "test.event", "functionId": "testfunc1", "path": "/"}`))
Expand Down Expand Up @@ -51,15 +51,15 @@ func TestSubscriptionCacheModifiedCORSConfiguration(t *testing.T) {
assert.Equal(t, &cors.CORS{Origins: []string{"http://example.com"}}, corsConfig)
}

func TestSubscriptionCacheModified_WrongPayload(t *testing.T) {
func TestSubscriptionCacheModifiedEventsWrongPayload(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub", []byte(`not json`))

assert.Equal(t, []functions.FunctionID(nil), scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedDeleted(t *testing.T) {
func TestSubscriptionCacheModifiedEventsDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "test.event", "functionId": "testfunc1", "path": "/"}`))
Expand All @@ -69,7 +69,7 @@ func TestSubscriptionCacheModifiedDeleted(t *testing.T) {
assert.Equal(t, []functions.FunctionID{functions.FunctionID("testfunc2")}, scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedDeletedHTTPSubscription(t *testing.T) {
func TestSubscriptionCacheModifiedHTTPSubscriptionDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "http", "functionId": "testfunc1", "path": "/", "method": "GET"}`))
Expand All @@ -79,11 +79,33 @@ func TestSubscriptionCacheModifiedDeletedHTTPSubscription(t *testing.T) {
assert.Nil(t, id)
}

func TestSubscriptionCacheModifiedDeletedLast(t *testing.T) {
func TestSubscriptionCacheModifiedEventsDeletedLast(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub", []byte(`{"subscriptionId":"testsub", "event": "test.event", "functionId": "testfunc", "path": "/"}`))
scache.Deleted("testsub", []byte(`{"subscriptionId":"testsub", "event": "test.event", "functionId": "testfunc", "path": "/"}`))

assert.Equal(t, []functions.FunctionID(nil), scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedInvokable(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))
scache.Modified("testsub2", []byte(`{"subscriptionId":"testsub2", "event": "invoke", "functionId": "testfunc2", "path": "/"}`))

_, exists := scache.invokable["/"][functions.FunctionID("testfunc1")]
assert.Equal(t, true, exists)
_, exists = scache.invokable["/"][functions.FunctionID("testfunc2")]
assert.Equal(t, true, exists)
}

func TestSubscriptionCacheModifiedInvokableDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))
scache.Deleted("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))

_, exists := scache.invokable["/"][functions.FunctionID("testfunc1")]
assert.Equal(t, false, exists)
}
9 changes: 9 additions & 0 deletions internal/cache/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func (tc *Target) HTTPBackingFunction(method, path string) (*functions.FunctionI
return root.Resolve(path)
}

// InvokableFunction returns function ID for handling invoke sync event.
func (tc *Target) InvokableFunction(path string, functionID functions.FunctionID) bool {
tc.subscriptionCache.RLock()
defer tc.subscriptionCache.RUnlock()

_, exists := tc.subscriptionCache.invokable[path][functionID]
return exists
}

// Function takes a function ID and returns a deserialized instance of that function, if it exists
func (tc *Target) Function(functionID functions.FunctionID) *functions.Function {
tc.functionCache.RLock()
Expand Down
10 changes: 10 additions & 0 deletions router/mock/targetcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ func (_mr *_MockTargeterRecorder) HTTPBackingFunction(arg0, arg1 interface{}) *g
return _mr.mock.ctrl.RecordCall(_mr.mock, "HTTPBackingFunction", arg0, arg1)
}

func (_m *MockTargeter) InvokableFunction(_param0 string, _param1 functions.FunctionID) bool {
ret := _m.ctrl.Call(_m, "InvokableFunction", _param0, _param1)
ret0, _ := ret[0].(bool)
return ret0
}

func (_mr *_MockTargeterRecorder) InvokableFunction(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "InvokableFunction", arg0, arg1)
}

func (_m *MockTargeter) SubscribersOfEvent(_param0 string, _param1 event.Type) []functions.FunctionID {
ret := _m.ctrl.Call(_m, "SubscribersOfEvent", _param0, _param1)
ret0, _ := ret[0].([]functions.FunctionID)
Expand Down
12 changes: 9 additions & 3 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if event.Type == eventpkg.TypeInvoke {
routerEventsSyncReceived.Inc()
router.handleInvokeEvent(event, w, r)
router.handleInvokeEvent(path, event, w, r)
} else if !event.IsSystem() {
router.enqueueWork(path, event)
w.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -306,8 +305,15 @@ func (router *Router) handleHTTPEvent(event *eventpkg.Event, w http.ResponseWrit
}
}

func (router *Router) handleInvokeEvent(event *eventpkg.Event, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleInvokeEvent(path string, event *eventpkg.Event, w http.ResponseWriter, r *http.Request) {
routerEventsSyncReceived.Inc()

functionID := functions.FunctionID(r.Header.Get(headerFunctionID))
if !router.targetCache.InvokableFunction(path, functionID) {
http.Error(w, "function or subscription not found", http.StatusNotFound)
return
}

resp, err := router.callFunction(functionID, *event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
1 change: 1 addition & 0 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestRouterServeHTTP_InvokeEventFunctionNotFound(t *testing.T) {
defer ctrl.Finish()
target := mock.NewMockTargeter(ctrl)
target.EXPECT().Function(functions.FunctionID("testfunc")).Return(nil).MaxTimes(1)
target.EXPECT().InvokableFunction("/", functions.FunctionID("testfunc")).Return(true).MaxTimes(1)
target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]functions.FunctionID{}).MaxTimes(1)
router := testrouter(target)

Expand Down
1 change: 1 addition & 0 deletions router/targeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
// Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions.
type Targeter interface {
HTTPBackingFunction(method, path string) (*functions.FunctionID, pathtree.Params, *cors.CORS)
InvokableFunction(path string, functionID functions.FunctionID) bool
Function(functionID functions.FunctionID) *functions.Function
SubscribersOfEvent(path string, eventType event.Type) []functions.FunctionID
}

0 comments on commit b970e42

Please sign in to comment.