Skip to content

Commit

Permalink
Publish system events on space prefixed path for hosted version (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed Jul 24, 2018
1 parent 8babc3c commit 8e58739
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 34 deletions.
7 changes: 6 additions & 1 deletion codecov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@ for d in $(go list ./... | grep -v vendor); do
cat profile.out >> coverage.txt
rm profile.out
fi
done
done

# include coverage for hosted EG
go test -race -coverprofile=profile.out -covermode=atomic -tags=hosted ./router
cat profile.out >> coverage.txt
rm profile.out
15 changes: 15 additions & 0 deletions router/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// +build !hosted

package router

func extractPath(host, path string) string {
return path
}

func systemPathFromSpace(space string) string {
return basePath
}

func systemPathFromPath(path string) string {
return basePath
}
22 changes: 22 additions & 0 deletions router/path_hosted.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// +build hosted

package router

import (
"strings"
)

// extractPath extracts path from hosted EG host name (<space>.eventgateway([a-z-]*)?.io|slsgateway.com)
func extractPath(host, path string) string {
subdomain := strings.Split(host, ".")[0]
return basePath + subdomain + path
}

func systemPathFromSpace(space string) string {
return basePath + space + "/"
}

// systemPathFromPath constructs path from path on which event was emitted. Helpful for "event.received" system event.
func systemPathFromPath(path string) string {
return basePath + strings.Split(path, "/")[1] + "/"
}
25 changes: 6 additions & 19 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"encoding/json"
"errors"
"net/http"
"regexp"
"strings"
"sync"

"github.com/jinzhu/copier"
Expand All @@ -22,6 +20,7 @@ import (

const (
mimeJSON = "application/json"
basePath = "/"
)

// Router calls a target function when an endpoint is hit, and handles pubsub message delivery.
Expand Down Expand Up @@ -163,8 +162,6 @@ func (router *Router) Drain() {
router.Unlock()
}

const hostedDomain = "(eventgateway([a-z-]*)?.io|slsgateway.com)"

var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)
Expand Down Expand Up @@ -242,7 +239,7 @@ func (router *Router) httpRequestHandler(space string, backingFunction function.
// handleAsyncSubscriptions fetched events subscribers, runs authorization and enqueues event in the queue
func (router *Router) handleAsyncSubscriptions(method, path string, event eventpkg.Event, r *http.Request) {
if event.IsSystem() {
router.log.Debug("System event received.", zap.Object("event", event))
router.log.Debug("System event received.", zap.String("path", path), zap.Object("event", event))
}

subscribers := router.targetCache.AsyncSubscribers(method, path, event.EventType)
Expand Down Expand Up @@ -458,7 +455,7 @@ func (router *Router) emitSystemEventReceived(path string, event eventpkg.Event,
mimeJSON,
eventpkg.SystemEventReceivedData{Path: path, Event: event, Headers: ihttp.FlattenHeader(header)},
)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromPath(path), *system, nil)
return router.plugins.React(system)
}

Expand All @@ -468,7 +465,7 @@ func (router *Router) emitSystemFunctionInvoking(space string, functionID functi
mimeJSON,
eventpkg.SystemFunctionInvokingData{Space: space, FunctionID: functionID, Event: event},
)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromSpace(space), *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokingType)).Inc()

Expand All @@ -480,7 +477,7 @@ func (router *Router) emitSystemFunctionInvoked(space string, functionID functio
eventpkg.SystemFunctionInvokedType,
mimeJSON,
eventpkg.SystemFunctionInvokedData{Space: space, FunctionID: functionID, Event: event, Result: result})
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromSpace(space), *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokedType)).Inc()

Expand All @@ -493,7 +490,7 @@ func (router *Router) emitSystemFunctionInvocationFailed(space string, functionI
eventpkg.SystemFunctionInvocationFailedType,
mimeJSON,
eventpkg.SystemFunctionInvocationFailedData{Space: space, FunctionID: functionID, Event: event, Error: err})
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromSpace(space), *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvocationFailedType)).Inc()
}
Expand Down Expand Up @@ -532,16 +529,6 @@ func determineErrorMessage(err error) string {
return message
}

func extractPath(host, path string) string {
extracted := path
rxp, _ := regexp.Compile(hostedDomain)
if rxp.MatchString(host) {
subdomain := strings.Split(host, ".")[0]
extracted = "/" + subdomain + path
}
return extracted
}

type backlogEvent struct {
space string
functionID function.ID
Expand Down
56 changes: 56 additions & 0 deletions router/router_hosted_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// +build hosted

package router_test

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/golang/mock/gomock"
"github.com/serverless/event-gateway/event"
"github.com/serverless/event-gateway/plugin"
"github.com/serverless/event-gateway/router"
"github.com/serverless/event-gateway/router/mock"
"go.uber.org/zap"
)

func TestHostedRouterServeHTTP(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
target := mock.NewMockTargeter(ctrl)

t.Run("emit system event 'event received' on path prefixed with space", func(t *testing.T) {
target.EXPECT().CORS(gomock.Any(), gomock.Any()).Return(nil)
target.EXPECT().SyncSubscriber(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
target.EXPECT().AsyncSubscribers(gomock.Any(), gomock.Any(), event.TypeName("http.request")).Return([]router.AsyncSubscriber{})

target.EXPECT().AsyncSubscribers(http.MethodPost, "/custom/", event.SystemEventReceivedType).Return([]router.AsyncSubscriber{})

router := setupTestRouter(target)
req, _ := http.NewRequest(http.MethodGet, "https://custom.slsgateway.com/foo/bar", nil)
recorder := httptest.NewRecorder()
router.ServeHTTP(recorder, req)
})

t.Run("extract path from hosted domain", func(t *testing.T) {
target.EXPECT().CORS(gomock.Any(), gomock.Any()).Return(nil)
target.EXPECT().AsyncSubscribers(gomock.Any(), gomock.Any(), event.SystemEventReceivedType).Return([]router.AsyncSubscriber{})

target.EXPECT().SyncSubscriber(http.MethodGet, "/custom/test", event.TypeName("http.request")).Return(nil)
target.EXPECT().AsyncSubscribers(http.MethodGet, "/custom/test", event.TypeName("http.request")).Return([]router.AsyncSubscriber{})

router := setupTestRouter(target)
req, _ := http.NewRequest(http.MethodGet, "https://custom.slsgateway.com/test", nil)
recorder := httptest.NewRecorder()
router.ServeHTTP(recorder, req)
})
}

func setupTestRouter(target router.Targeter) *router.Router {
log := zap.NewNop()
plugins := plugin.NewManager([]string{}, log)
router := router.New(10, 10, target, plugins, log)
router.StartWorkers()
return router
}
16 changes: 2 additions & 14 deletions router/router_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !hosted

package router_test

import (
Expand Down Expand Up @@ -61,20 +63,6 @@ func TestRouterServeHTTP(t *testing.T) {
assert.Equal(t, "http://example.com", recorder.Header().Get("Access-Control-Allow-Origin"))
})

t.Run("extract path from hosted domain", func(t *testing.T) {
target.EXPECT().CORS(gomock.Any(), gomock.Any()).Return(nil)
target.EXPECT().SyncSubscriber(http.MethodGet, "/custom/test", event.TypeName("http.request")).Return(nil).MaxTimes(1)
target.EXPECT().AsyncSubscribers(http.MethodGet, "/custom/test", event.TypeName("http.request")).Return([]router.AsyncSubscriber{}).MaxTimes(1)
target.EXPECT().AsyncSubscribers(http.MethodPost, "/", event.SystemEventReceivedType).Return([]router.AsyncSubscriber{}).MaxTimes(1)
router := setupTestRouter(target)

req, _ := http.NewRequest(http.MethodGet, "https://custom.slsgateway.com/test", nil)
recorder := httptest.NewRecorder()
router.ServeHTTP(recorder, req)

assert.Equal(t, http.StatusAccepted, recorder.Code)
})

t.Run("reject if system event", func(t *testing.T) {
target.EXPECT().CORS(gomock.Any(), gomock.Any()).Return(nil)
router := setupTestRouter(target)
Expand Down

0 comments on commit 8e58739

Please sign in to comment.