Skip to content

Commit

Permalink
remove topics HTTP API. Closes #120 (#130)
Browse files Browse the repository at this point in the history
* remove publisher concept

* remove async

* add endpoint for emitting events

* remove API docs

* remove topics. Closes #120

* revert functions change

* fix tests
  • Loading branch information
mthenw authored and spacejam committed Jul 18, 2017
1 parent e201970 commit 806651f
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 371 deletions.
72 changes: 6 additions & 66 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,38 +553,14 @@ Response:

### Pub/Sub

#### Create topic
#### Create subscription

`POST /v0/gateway/api/topic`
`POST /v0/gateway/api/subscriptions`

Request:

- `topicId` - `string` - name of topic

Response:

- `topicId` - `string` - name of topic

#### Delete topic

`DELETE /v0/gateway/api/topic/<topic id>`

#### Get topics

`GET /v0/gateway/api/topic`

Response:

- `topics` - `array` of `object` - topics
- `topicId` - `string` - topic name

#### Add subscription

`POST /v0/gateway/api/topic/<topic id>/subscription`

Request:

- `functionId` - ID of function or function group to receive events from the topic
- `topicId` - `string` - ID of topic
- `functionId` - `string` - ID of function or function group to receive events

Response:

Expand All @@ -594,11 +570,11 @@ Response:

#### Delete subscription

`DELETE /v0/gateway/api/topic/<topic id>/subscription/<subscription id>`
`DELETE /v0/gateway/api/subscriptions/<subscription id>`

#### Get subscriptions

`GET /v0/gateway/api/topic/<topic id>/subscription`
`GET /v0/gateway/api/subscriptions`

Response:

Expand All @@ -607,42 +583,6 @@ Response:
- `topicId` - `string` - ID of topic
- `functionId` - ID of function or function group

#### Add publisher

`POST /v0/gateway/api/topic/<topic id>/publisher`

Request:

- `functionId` - ID of function or function group to publish events to the topic
- `type` - either `input` or `output`

Response:

- `publisherId` - `string` - publisher ID, which is topic + function ID, e.g. `newusers-/userCreateGroup`
- `functionId` - ID of function or function group to publish events to the topic
- `type` - either `input` or `output`

#### Delete publisher

`DELETE /v0/gateway/api/topic/<topic id>/publisher/<publisher id>`

#### Get Publishers

`GET /v0/gateway/api/topic/<topic id>/publisher`

Response:

- `publishers` - `array` of `object` - backing functions
- `publisherId` - `string` - publisher ID
- `functionId` - ID of function or function group
- `type` - either `input` or `output`

#### Publish message to the topic

`POST /v0/gateway/api/topic/<topic id>/publish`

Request: arbitrary payload

## Plugins

Plugins are available for extending the behavior of the Event Gateway core. Examples include authentication, integration with external identity management systems,
Expand Down
13 changes: 5 additions & 8 deletions integration_tests/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,18 @@ func TestFunctionPubSub(t *testing.T) {
})

// set up pub/sub
topicName := "smileys"
post(testAPIServer.URL+"/v0/gateway/api/topic",
pubsub.Topic{
ID: pubsub.TopicID(topicName),
})
eventName := "smileys"

post(testAPIServer.URL+"/v0/gateway/api/topic/"+topicName+"/subscription",
post(testAPIServer.URL+"/v0/gateway/api/subscriptions",
pubsub.Subscription{
FunctionID: subscriberFnID,
TopicID: pubsub.TopicID(eventName),
})

wait5Seconds(router.WaitForSubscriber(pubsub.TopicID(topicName)),
wait5Seconds(router.WaitForSubscriber(pubsub.TopicID(eventName)),
"timed out waiting for subscriber to be configured!")

emit(testRouterServer.URL, topicName, []byte(expected))
emit(testRouterServer.URL, eventName, []byte(expected))

wait5Seconds(smileyReceived,
"timed out waiting to receive pub/sub event in subscriber!")
Expand Down
69 changes: 5 additions & 64 deletions pubsub/httpapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,66 +15,9 @@ type HTTPAPI struct {

// RegisterRoutes register HTTP API routes
func (h HTTPAPI) RegisterRoutes(router *httprouter.Router) {
router.POST("/v0/gateway/api/topic", h.createTopic)
router.DELETE("/v0/gateway/api/topic/:topicID", h.deleteTopic)
router.GET("/v0/gateway/api/topic", h.getTopics)

router.POST("/v0/gateway/api/topic/:topicID/subscription", h.createSubscription)
router.DELETE("/v0/gateway/api/topic/:topicID/subscription/:subscriptionID", h.deleteSubscription)
router.GET("/v0/gateway/api/topic/:topicID/subscription", h.getSubscriptions)
}

func (h HTTPAPI) createTopic(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)

t := &Topic{}
dec := json.NewDecoder(r.Body)
dec.Decode(t)

output, err := h.PubSub.CreateTopic(t)
if err != nil {
if _, ok := err.(*ErrorAlreadyExists); ok {
w.WriteHeader(http.StatusBadRequest)
} else if _, ok := err.(*ErrorValidation); ok {
w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusInternalServerError)
}

encoder.Encode(&httpapi.Error{Error: err.Error()})
} else {
encoder.Encode(output)
}
}

func (h HTTPAPI) deleteTopic(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)

err := h.PubSub.DeleteTopic(TopicID(params.ByName("topicID")))
if err != nil {
if _, ok := err.(*ErrorNotFound); ok {
w.WriteHeader(http.StatusNotFound)
} else {
w.WriteHeader(http.StatusInternalServerError)
}

encoder.Encode(&httpapi.Error{Error: err.Error()})
}
}

func (h HTTPAPI) getTopics(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)

tps, err := h.PubSub.GetAllTopics()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
encoder.Encode(&httpapi.Error{Error: err.Error()})
} else {
encoder.Encode(&topics{tps})
}
router.POST("/v0/gateway/api/subscriptions", h.createSubscription)
router.DELETE("/v0/gateway/api/subscriptions/:subscriptionID", h.deleteSubscription)
router.GET("/v0/gateway/api/subscriptions", h.getSubscriptions)
}

func (h HTTPAPI) createSubscription(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
Expand All @@ -85,12 +28,10 @@ func (h HTTPAPI) createSubscription(w http.ResponseWriter, r *http.Request, para
dec := json.NewDecoder(r.Body)
dec.Decode(s)

output, err := h.PubSub.CreateSubscription(TopicID(params.ByName("topicID")), s)
output, err := h.PubSub.CreateSubscription(s)
if err != nil {
if _, ok := err.(*ErrorSubscriptionAlreadyExists); ok {
w.WriteHeader(http.StatusBadRequest)
} else if _, ok := err.(*ErrorNotFound); ok {
w.WriteHeader(http.StatusBadRequest)
} else if _, ok := err.(*ErrorFunctionNotFound); ok {
w.WriteHeader(http.StatusBadRequest)
} else if _, ok := err.(*ErrorSubscriptionValidation); ok {
Expand Down Expand Up @@ -124,7 +65,7 @@ func (h HTTPAPI) getSubscriptions(w http.ResponseWriter, r *http.Request, params
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)

subs, err := h.PubSub.GetAllSubscriptions(TopicID(params.ByName("topicID")))
subs, err := h.PubSub.GetAllSubscriptions()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
encoder.Encode(&httpapi.Error{Error: err.Error()})
Expand Down
Loading

0 comments on commit 806651f

Please sign in to comment.