Skip to content

Commit

Permalink
HTTP CloudEvent coercion (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
RaeesBhatti authored and mthenw committed May 24, 2018
1 parent 807d0e9 commit bb87dcb
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 304 deletions.
12 changes: 12 additions & 0 deletions event/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package event

import "fmt"

// ErrParsingCloudEvent occurs when payload is not valid CloudEvent.
type ErrParsingCloudEvent struct {
Message string
}

func (e ErrParsingCloudEvent) Error() string {
return fmt.Sprintf("CloudEvent doesn't validate: %s", e.Message)
}
205 changes: 153 additions & 52 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package event
import (
"encoding/json"
"errors"
"io/ioutil"
"mime"
"net/http"
"strings"
"time"
"unicode"

"go.uber.org/zap/zapcore"

uuid "github.com/satori/go.uuid"
"github.com/satori/go.uuid"
ihttp "github.com/serverless/event-gateway/internal/http"
"github.com/serverless/event-gateway/internal/zap"
validator "gopkg.in/go-playground/validator.v9"
"gopkg.in/go-playground/validator.v9"
)

// Type uniquely identifies an event type.
Expand All @@ -19,19 +24,15 @@ type Type string
const (
// TypeInvoke is a special type of event for sync function invocation.
TypeInvoke = Type("invoke")

// TypeHTTP is a special type of event for sync http subscriptions.
TypeHTTP = Type("http")
)

// TransformationVersion is indicative of the revision of how Event Gateway transforms a request into CloudEvents format.
const (
// TransformationVersion is indicative of the revision of how Event Gateway transforms a request into CloudEvents format.
TransformationVersion = "0.1"
)

const (
mimeJSON = "application/json"
mimeFormMultipart = "multipart/form-data"
mimeFormURLEncoded = "application/x-www-form-urlencoded"
// CloudEventsVersion currently supported by Event Gateway
CloudEventsVersion = "0.1"
)

// Event is a default event structure. All data that passes through the Event Gateway
Expand All @@ -50,44 +51,81 @@ type Event struct {
}

// New return new instance of Event.
func New(eventType Type, mime string, payload interface{}) *Event {
func New(eventType Type, mimeType string, payload interface{}) *Event {
event := &Event{
EventType: eventType,
CloudEventsVersion: "0.1",
CloudEventsVersion: CloudEventsVersion,
Source: "https://serverless.com/event-gateway/#transformationVersion=" + TransformationVersion,
EventID: uuid.NewV4().String(),
EventTime: time.Now(),
ContentType: mime,
ContentType: mimeType,
Data: payload,
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformation-version": TransformationVersion,
},
},
}

// it's a custom event, possibly CloudEvent
if eventType != TypeHTTP && eventType != TypeInvoke {
cloudEvent, err := parseAsCloudEvent(eventType, mime, payload)
if err == nil {
event = cloudEvent
} else {
event.Extensions = zap.MapStringInterface{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformation-version": TransformationVersion,
},
}
event.enhanceEventData()
return event
}

// FromRequest takes an HTTP request and returns an Event along with path. Most of the implementation
// is based on https://github.com/cloudevents/spec/blob/master/http-transport-binding.md.
// This function also supports legacy mode where event type is sent in Event header.
func FromRequest(r *http.Request) (*Event, error) {
contentType := r.Header.Get("Content-Type")
mimeType, _, err := mime.ParseMediaType(contentType)
if err != nil {
if err.Error() != "mime: no media type" {
return nil, err
}
mimeType = "application/octet-stream"
}
// Read request body
body := []byte{}
if r.Body != nil {
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
}

// Because event.Data is []bytes here, it will be base64 encoded by default when being sent to remote function,
// which is why we change the event.Data type to "string" for forms, so that, it is left intact.
if eventBody, ok := event.Data.([]byte); ok && len(eventBody) > 0 {
switch {
case isJSONContent(mime):
json.Unmarshal(eventBody, &event.Data)
case strings.HasPrefix(mime, mimeFormMultipart), mime == mimeFormURLEncoded:
event.Data = string(eventBody)
var event *Event
if mimeType == mimeCloudEventsJSON { // CloudEvents Structured Content Mode
return parseAsCloudEvent(mimeType, body)
} else if isCloudEventsBinaryContentMode(r.Header) { // CloudEvents Binary Content Mode
return parseAsCloudEventBinary(r.Header, body)
} else if isLegacyMode(r.Header) {
if mimeType == mimeJSON { // CloudEvent in Legacy Mode
event, err = parseAsCloudEvent(mimeType, body)
if err != nil {
return New(Type(r.Header.Get("event")), mimeType, body), nil
}
return event, err
}

return New(Type(r.Header.Get("event")), mimeType, body), nil
}

return event
return New(TypeHTTP, mimeCloudEventsJSON, NewHTTPRequestData(r, body)), nil
}

// Validate Event struct
func (e *Event) Validate() error {
validate := validator.New()
err := validate.Struct(e)
if err != nil {
return &ErrParsingCloudEvent{err.Error()}
}
return nil
}

// IsSystem indicates if the event is a system event.
func (e *Event) IsSystem() bool {
return strings.HasPrefix(string(e.EventType), "gateway.")
}

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface
Expand Down Expand Up @@ -116,40 +154,103 @@ func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error {
return nil
}

// IsSystem indicates if the event is a system event.
func (e Event) IsSystem() bool {
return strings.HasPrefix(string(e.EventType), "gateway.")
func isLegacyMode(headers http.Header) bool {
if headers.Get("Event") != "" {
return true
}

return false
}

func parseAsCloudEvent(eventType Type, mime string, payload interface{}) (*Event, error) {
if !isJSONContent(mime) {
return nil, errors.New("content type is not json")
func isCloudEventsBinaryContentMode(headers http.Header) bool {
if headers.Get("CE-EventType") != "" &&
headers.Get("CE-CloudEventsVersion") != "" &&
headers.Get("CE-Source") != "" &&
headers.Get("CE-EventID") != "" {
return true
}

return false
}

func parseAsCloudEventBinary(headers http.Header, payload interface{}) (*Event, error) {
event := &Event{
EventType: Type(headers.Get("CE-EventType")),
EventTypeVersion: headers.Get("CE-EventTypeVersion"),
CloudEventsVersion: headers.Get("CE-CloudEventsVersion"),
Source: headers.Get("CE-Source"),
EventID: headers.Get("CE-EventID"),
ContentType: headers.Get("Content-Type"),
Data: payload,
}

err := event.Validate()
if err != nil {
return nil, err
}

if val, err := time.Parse(time.RFC3339, headers.Get("CE-EventTime")); err == nil {
event.EventTime = val
}

if val := headers.Get("CE-SchemaURL"); len(val) > 0 {
event.SchemaURL = val
}

event.Extensions = map[string]interface{}{}
for key, val := range ihttp.FlattenHeader(headers) {
if strings.HasPrefix(key, "Ce-X-") {
key = strings.TrimLeft(key, "Ce-X-")
// Make first character lowercase
runes := []rune(key)
runes[0] = unicode.ToLower(runes[0])
event.Extensions[string(runes)] = val
}
}

event.enhanceEventData()
return event, nil
}

func parseAsCloudEvent(mime string, payload interface{}) (*Event, error) {
body, ok := payload.([]byte)
if ok {
validate := validator.New()

customEvent := &Event{}
err := json.Unmarshal(body, customEvent)
event := &Event{}
err := json.Unmarshal(body, event)
if err != nil {
return nil, err
}

err = validate.Struct(customEvent)
err = event.Validate()
if err != nil {
return nil, err
}

if eventType != customEvent.EventType {
return nil, errors.New("wrong event type")
}

return customEvent, nil
event.enhanceEventData()
return event, nil
}

return nil, errors.New("couldn't cast to []byte")
}

func isJSONContent(mime string) bool {
return (mime == mimeJSON || strings.HasSuffix(mime, "+json"))
const (
mimeJSON = "application/json"
mimeFormMultipart = "multipart/form-data"
mimeFormURLEncoded = "application/x-www-form-urlencoded"
mimeCloudEventsJSON = "application/cloudevents+json"
)

// Because event.Data is []byte, it will be base64 encoded by default when being sent to remote function,
// which is why we change the event.Data type to "string" for forms or to map[string]interface{} for JSON
// so that, it is left intact.
func (e *Event) enhanceEventData() {
contentType := e.ContentType
if eventBody, ok := e.Data.([]byte); ok && len(eventBody) > 0 {
switch {
case contentType == mimeJSON || strings.HasSuffix(contentType, "+json"):
json.Unmarshal(eventBody, &e.Data)
case strings.HasPrefix(contentType, mimeFormMultipart), contentType == mimeFormURLEncoded:
e.Data = string(eventBody)
}
}
}
Loading

0 comments on commit bb87dcb

Please sign in to comment.