Skip to content

Commit

Permalink
add Path parameter support for async subscriptions (#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw authored Sep 26, 2017
1 parent 46b4e31 commit 311b2bf
Show file tree
Hide file tree
Showing 16 changed files with 366 additions and 251 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ script:
- gometalinter --install --force
- gometalinter --vendor --fast --disable=gotype --disable=vetshadow --skip=mock ./...
- go get github.com/mattn/goveralls
- goveralls -race -service=travis-ci
- goveralls -race -service=travis-ci -ignore=**/mock/*
after_success:
- test -n "$TRAVIS_TAG" && curl -sL https://git.io/goreleaser | bash
notifications:
Expand Down
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ teams, eliminates effort spent redeploying functions, and allows you to easily s
HTTP services, even different cloud providers. Functions may be registered as subscribers to a custom event.
When an event occurs, all subscribers are called asynchronously with the event as its argument.

Creating a subscription requires providing ID of registered function, an event type and a path (`/` by default). The
path property indicated URL path which Events API will be listening on.

#### Example: Subscribe to an Event

##### curl example
Expand All @@ -150,7 +153,8 @@ curl --request POST \
--header 'content-type: application/json' \
--data '{
"functionId": "sendEmail",
"event": "user.created"
"event": "user.created",
"path": "/myteam"
}'
```

Expand All @@ -160,10 +164,13 @@ curl --request POST \
const eventGateway = fdk.eventGateway({ url: 'http://localhost' })
eventGateway.subscribe({
event: "user.created",
functionId: "sendEmail"
functionId: "sendEmail",
path: "/myteam"
})
```

`sendEmail` function will be invoked for every `user.created` event to `<Events API>/myteam` endpoint.

#### Example: Emit an Event

##### curl example
Expand All @@ -188,8 +195,9 @@ eventGateway.emit({

#### Sync subscriptions via HTTP event

Custom event subscriptions are asynchronous. There is a special `http` event type for creating synchronous subscriptions.
`http` event is a HTTP request received on specified path and for specified HTTP method.
Custom event subscriptions are asynchronous. There is a special `http` event type for creating synchronous
subscriptions. `http` event is an HTTP request received to specified path and for specified HTTP method. There can be
only one `http` subscription for the same `method` and `path` pair.

#### Example: Subscribe to an "http" Event

Expand Down Expand Up @@ -219,6 +227,8 @@ eventGateway.subscribe({
})
```

`listUsers` function will be invoked for every HTTP GET request to `<Events API>/users` endpoint.

## Events API

The Event Gateway exposes an API for emitting events. Events API can be used for emitting custom event, HTTP events and
Expand Down Expand Up @@ -280,9 +290,11 @@ If you are looking for more system events, please comment [the corresponding iss

### Emit a Custom Event

Creating a subscription requires `path` property (by default it's "/"). `path` indicates path under which you can push.

**Endpoint**

`POST <Events API URL>/`
`POST <Events API URL>/<Subscription Path>`

**Request Headers**

Expand Down
162 changes: 0 additions & 162 deletions internal/cache/cache.go

This file was deleted.

44 changes: 44 additions & 0 deletions internal/cache/function_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package cache

import (
"bytes"
"encoding/json"
"sync"

"go.uber.org/zap"

"github.com/serverless/event-gateway/functions"
)

type functionCache struct {
sync.RWMutex
cache map[functions.FunctionID]*functions.Function
log *zap.Logger
}

func newFunctionCache(log *zap.Logger) *functionCache {
return &functionCache{
cache: map[functions.FunctionID]*functions.Function{},
log: log,
}
}

func (c *functionCache) Modified(k string, v []byte) {
c.log.Debug("Function local cache received value update.", zap.String("key", k), zap.String("value", string(v)))

f := &functions.Function{}
err := json.NewDecoder(bytes.NewReader(v)).Decode(f)
if err != nil {
c.log.Error("Could not deserialize Function state.", zap.Error(err), zap.String("key", k), zap.String("value", string(v)))
} else {
c.Lock()
defer c.Unlock()
c.cache[functions.FunctionID(k)] = f
}
}

func (c *functionCache) Deleted(k string, v []byte) {
c.Lock()
defer c.Unlock()
delete(c.cache, functions.FunctionID(k))
}
41 changes: 41 additions & 0 deletions internal/cache/function_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cache

import (
"testing"

"github.com/serverless/event-gateway/functions"
"github.com/stretchr/testify/assert"

"go.uber.org/zap"
)

func TestFunctionCacheModified(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())

fcache.Modified("testfunc1", []byte(`{"functionId":"testfunc1"}`))
fcache.Modified("testfunc2", []byte(`{"functionId":"testfunc2"}`))

id1 := functions.FunctionID("testfunc1")
id2 := functions.FunctionID("testfunc2")
assert.Equal(t, &functions.Function{ID: id1}, fcache.cache[id1])
assert.Equal(t, &functions.Function{ID: id2}, fcache.cache[id2])
}

func TestFunctionCacheModified_WrongPayload(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())

fcache.Modified("testfunc1", []byte(`not json`))

assert.Equal(t, map[functions.FunctionID]*functions.Function{}, fcache.cache)
}

func TestFunctionCacheModifiedDeleted(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())

fcache.Modified("testfunc1", []byte(`{"functionId":"testfunc1"}`))
fcache.Modified("testfunc2", []byte(`{"functionId":"testfunc2"}`))
fcache.Deleted("testfunc2", []byte(`{"functionId":"testfunc2"}`))

id1 := functions.FunctionID("testfunc1")
assert.Equal(t, map[functions.FunctionID]*functions.Function{id1: &functions.Function{ID: id1}}, fcache.cache)
}
Loading

0 comments on commit 311b2bf

Please sign in to comment.