diff --git a/README.md b/README.md index 14a923c..3ddfddb 100644 --- a/README.md +++ b/README.md @@ -307,6 +307,16 @@ arbitrary payload, subscribed function receives an event in above schema. `data` Status code: - `200 OK` with payload with function response +#### Respond to an HTTP Event + +To respond to an HTTP event a function needs to return object with following fields: + +- `statusCode` - `int` - response status code, default: 200 +- `headers` - `object` - response headers +- `body` - `string` - response body + +Currently, the event gateway supports only string responses. + ### Invoking a Registered Function - Sync Function Invocation **Endpoint** diff --git a/router/errors.go b/router/errors.go new file mode 100644 index 0000000..9567632 --- /dev/null +++ b/router/errors.go @@ -0,0 +1,20 @@ +package router + +import ( + "fmt" + "net/http" +) + +// ErrHTTPResponseObjectMalformed occurs when HTTP response object is not valid JSON. +type ErrHTTPResponseObjectMalformed struct { + StatusCode int +} + +func (e ErrHTTPResponseObjectMalformed) Error() string { + return fmt.Sprintf("HTTP response object returned by function malformed.") +} + +// NewErrHTTPResponseObjectMalformed return ErrHTTPResponseObjectMalformed +func NewErrHTTPResponseObjectMalformed() ErrHTTPResponseObjectMalformed { + return ErrHTTPResponseObjectMalformed{http.StatusInternalServerError} +} diff --git a/router/event.go b/router/event.go index 88e2024..b190ba8 100644 --- a/router/event.go +++ b/router/event.go @@ -39,6 +39,13 @@ type HTTPEvent struct { Method string `json:"method"` } +// HTTPResponse is a response schema returned by subscribed function in case of HTTP event. +type HTTPResponse struct { + StatusCode int `json:"statusCode"` + Headers map[string]string `json:"headers"` + Body string `json:"body"` +} + const ( mimeJSON = "application/json" mimeOctetStrem = "application/octet-stream" diff --git a/router/integration_test.go b/router/integration_test.go index abd3823..e008e22 100644 --- a/router/integration_test.go +++ b/router/integration_test.go @@ -34,7 +34,7 @@ func TestMain(t *testing.T) { etcd.Register() } -func TestIntegration_Subscription(t *testing.T) { +func TestIntegration_AsyncSubscription(t *testing.T) { logCfg := zap.NewDevelopmentConfig() logCfg.DisableStacktrace = true log, _ := logCfg.Build() @@ -99,25 +99,27 @@ func TestIntegration_Subscription(t *testing.T) { shutdownGuard.ShutdownAndWait() } -func TestIntegration_HTTPSubscription(t *testing.T) { +func TestIntegration_HTTPResponse(t *testing.T) { logCfg := zap.NewDevelopmentConfig() logCfg.DisableStacktrace = true log, _ := logCfg.Build() + kv, shutdownGuard := newTestEtcd() testAPIServer := newConfigAPIServer(kv, log) defer testAPIServer.Close() + router, testRouterServer := newTestRouterServer(kv, log) defer testRouterServer.Close() testTargetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "😸") + fmt.Fprintf(w, `{"statusCode":201,"headers":{"content-type":"text/html"},"body":""}`) })) defer testTargetServer.Close() post(testAPIServer.URL+"/v1/functions", functions.Function{ - ID: functions.FunctionID("supersmileyfunction"), + ID: functions.FunctionID("httpresponse"), Provider: &functions.Provider{ Type: functions.HTTPEndpoint, URL: testTargetServer.URL, @@ -125,20 +127,22 @@ func TestIntegration_HTTPSubscription(t *testing.T) { }) post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{ - FunctionID: functions.FunctionID("supersmileyfunction"), + FunctionID: functions.FunctionID("httpresponse"), Event: "http", Method: "GET", - Path: "/smilez", + Path: "/httpresponse", }) select { - case <-router.WaitForEndpoint(subscriptions.NewEndpointID("GET", "/smilez")): + case <-router.WaitForEndpoint(subscriptions.NewEndpointID("GET", "/httpresponse")): case <-time.After(10 * time.Second): panic("timed out waiting for endpoint to be configured!") } - _, _, body := get(testRouterServer.URL + "/smilez") - assert.Equal(t, "😸", body) + statusCode, headers, body := get(testRouterServer.URL + "/httpresponse") + assert.Equal(t, statusCode, 201) + assert.Equal(t, headers.Get("content-type"), "text/html") + assert.Equal(t, body, "") router.Drain() shutdownGuard.ShutdownAndWait() diff --git a/router/router.go b/router/router.go index eede758..3eab9fc 100644 --- a/router/router.go +++ b/router/router.go @@ -207,6 +207,25 @@ func (router *Router) handleSyncEvent(name string, payload []byte, w http.Respon zap.String("functionId", string(functionID)), zap.String("event", string(payload)), zap.String("response", string(resp))) + if name == eventHTTP { + httpResponse := &HTTPResponse{StatusCode: http.StatusOK} + err = json.Unmarshal(resp, httpResponse) + if err != nil { + httperr := NewErrHTTPResponseObjectMalformed() + http.Error(w, httperr.Error(), httperr.StatusCode) + + router.log.Info(httperr.Error(), zap.String("response", string(resp))) + + return + } + + for key, value := range httpResponse.Headers { + w.Header().Set(key, value) + } + w.WriteHeader(httpResponse.StatusCode) + resp = []byte(httpResponse.Body) + } + _, err = w.Write(resp) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError)