Skip to content

Commit

Permalink
Event types (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed May 28, 2018
1 parent e4360c6 commit 68bcaf6
Show file tree
Hide file tree
Showing 33 changed files with 1,277 additions and 364 deletions.
3 changes: 2 additions & 1 deletion cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func main() {

// Implementation of function and subscription services
service := &eventgateway.Service{
EventTypeStore: intstore.NewPrefixed("/serverless-event-gateway/eventtypes", kvstore),
FunctionStore: intstore.NewPrefixed("/serverless-event-gateway/functions", kvstore),
SubscriptionStore: intstore.NewPrefixed("/serverless-event-gateway/subscriptions", kvstore),
Log: log,
Expand All @@ -113,7 +114,7 @@ func main() {
ShutdownGuard: shutdownGuard,
})

httpapi.StartConfigAPI(service, service, httpapi.ServerConfig{
httpapi.StartConfigAPI(service, service, service, httpapi.ServerConfig{
TLSCrt: configTLSCrt,
TLSKey: configTLSKey,
Port: *configPort,
Expand Down
11 changes: 11 additions & 0 deletions docs/prometheus-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@ Both Events and Configuration API exposes Prometheus metrics. The metrics are ac
| `gateway_events_backlog` | Gauge of asynchronous events count waiting to be processed. | Gauge | |
| `gateway_events_custom_processing_seconds` | Bucketed histogram of processing duration of an event. From receiving the asynchronous custom event to calling a function. | Histogram | |

### Labels

- `space` - space name
- `type` - event type name

## Configuration API Metrics

| Metric Name | Description | Type | Labels |
| ----------------------------------------- | ------------------------------------------------------------ | --------- | --------------------------------- |
| `gateway_eventtypes_total` | Gauge of registered event types count. | Gauge | `space` |
| `gateway_functions_total` | Gauge of registered functions count. | Gauge | `space` |
| `gateway_subscriptions_total` | Gauge of created subscriptions count. | Gauge | `space` |
| `gateway_config_requests_total` | Total of Config API requests. | Counter | `space`, `resource`, `operation` |
| `gateway_config_request_duration_seconds` | Bucketed histogram of request duration of Config API requests. | Histogram | |
### Labels

- `space` - space name
- `resource` - Configuration API resource, possible values: `eventtype`, `function` or `subscription`
- `operation` - Configuration API operation, possible values: `create`, `get`, `delete`, `list`, `update`
34 changes: 34 additions & 0 deletions event/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,40 @@ package event

import "fmt"

// ErrEventTypeNotFound occurs when event type cannot be found.
type ErrEventTypeNotFound struct {
Name TypeName
}

func (e ErrEventTypeNotFound) Error() string {
return fmt.Sprintf("Event Type %q not found.", e.Name)
}

// ErrEventTypeAlreadyExists occurs when event type with specified name already exists.
type ErrEventTypeAlreadyExists struct {
Name TypeName
}

func (e ErrEventTypeAlreadyExists) Error() string {
return fmt.Sprintf("Event Type %q already exists.", e.Name)
}

// ErrEventTypeValidation occurs when event type payload doesn't validate.
type ErrEventTypeValidation struct {
Message string
}

func (e ErrEventTypeValidation) Error() string {
return fmt.Sprintf("Event Type doesn't validate. Validation error: %s", e.Message)
}

// ErrEventTypeHasSubscriptionsError occurs when there are subscription for the event type.
type ErrEventTypeHasSubscriptionsError struct{}

func (e ErrEventTypeHasSubscriptionsError) Error() string {
return fmt.Sprintf("Event type cannot be deleted because there are subscriptions using it.")
}

// ErrParsingCloudEvent occurs when payload is not valid CloudEvent.
type ErrParsingCloudEvent struct {
Message string
Expand Down
16 changes: 5 additions & 11 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@ import (
"gopkg.in/go-playground/validator.v9"
)

// Type uniquely identifies an event type.
type Type string

const (
// TypeHTTPRequest is a special type of event for sync http subscriptions.
TypeHTTPRequest = Type("http.request")

// TransformationVersion is indicative of the revision of how Event Gateway transforms a request into CloudEvents format.
TransformationVersion = "0.1"

Expand All @@ -35,7 +29,7 @@ const (
// Event is a default event structure. All data that passes through the Event Gateway
// is formatted to a format defined CloudEvents v0.1 spec.
type Event struct {
EventType Type `json:"eventType" validate:"required"`
EventType TypeName `json:"eventType" validate:"required"`
EventTypeVersion string `json:"eventTypeVersion,omitempty"`
CloudEventsVersion string `json:"cloudEventsVersion" validate:"required"`
Source string `json:"source" validate:"uri,required"`
Expand All @@ -48,7 +42,7 @@ type Event struct {
}

// New return new instance of Event.
func New(eventType Type, mimeType string, payload interface{}) *Event {
func New(eventType TypeName, mimeType string, payload interface{}) *Event {
event := &Event{
EventType: eventType,
CloudEventsVersion: CloudEventsVersion,
Expand Down Expand Up @@ -99,12 +93,12 @@ func FromRequest(r *http.Request) (*Event, error) {
if mimeType == mimeJSON { // CloudEvent in Legacy Mode
event, err = parseAsCloudEvent(mimeType, body)
if err != nil {
return New(Type(r.Header.Get("event")), mimeType, body), nil
return New(TypeName(r.Header.Get("event")), mimeType, body), nil
}
return event, err
}

return New(Type(r.Header.Get("event")), mimeType, body), nil
return New(TypeName(r.Header.Get("event")), mimeType, body), nil
}

return New(TypeHTTPRequest, mimeCloudEventsJSON, NewHTTPRequestData(r, body)), nil
Expand Down Expand Up @@ -172,7 +166,7 @@ func isCloudEventsBinaryContentMode(headers http.Header) bool {

func parseAsCloudEventBinary(headers http.Header, payload interface{}) (*Event, error) {
event := &Event{
EventType: Type(headers.Get("CE-EventType")),
EventType: TypeName(headers.Get("CE-EventType")),
EventTypeVersion: headers.Get("CE-EventTypeVersion"),
CloudEventsVersion: headers.Get("CE-CloudEventsVersion"),
Source: headers.Get("CE-Source"),
Expand Down
22 changes: 11 additions & 11 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestNew(t *testing.T) {

func TestNew_Encoding(t *testing.T) {
for _, testCase := range encodingTests {
result := eventpkg.New(eventpkg.Type("test.event"), testCase.contentType, testCase.body)
result := eventpkg.New(eventpkg.TypeName("test.event"), testCase.contentType, testCase.body)

assert.Equal(t, testCase.expectedBody, result.Data)
}
Expand Down Expand Up @@ -62,18 +62,18 @@ func TestFromRequest(t *testing.T) {

var newTests = []struct {
name string
eventType eventpkg.Type
eventType eventpkg.TypeName
mime string
payload interface{}
expectedEvent eventpkg.Event
}{
{
name: "not CloudEvent",
eventType: eventpkg.Type("user.created"),
eventType: eventpkg.TypeName("user.created"),
mime: "application/json",
payload: []byte("test"),
expectedEvent: eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/json",
Expand All @@ -88,11 +88,11 @@ var newTests = []struct {
},
{
name: "system event",
eventType: eventpkg.Type("user.created"),
eventType: eventpkg.TypeName("user.created"),
mime: "application/json",
payload: eventpkg.SystemEventReceivedData{},
expectedEvent: eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/json",
Expand Down Expand Up @@ -158,7 +158,7 @@ var fromRequestTests = []struct {
"data": "test"
}`),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "http://example.com",
ContentType: "text/plain",
Expand Down Expand Up @@ -188,7 +188,7 @@ var fromRequestTests = []struct {
},
requestBody: []byte("hey there"),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("myevent"),
EventType: eventpkg.TypeName("myevent"),
CloudEventsVersion: "0.1",
Source: "https://example.com",
ContentType: "text/plain",
Expand Down Expand Up @@ -217,7 +217,7 @@ var fromRequestTests = []struct {
},
requestBody: []byte("hey there"),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("myevent"),
EventType: eventpkg.TypeName("myevent"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/octet-stream",
Expand All @@ -241,7 +241,7 @@ var fromRequestTests = []struct {
"data": "test"
}`),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "http://example.com",
ContentType: "text/plain",
Expand All @@ -258,7 +258,7 @@ var fromRequestTests = []struct {
"eventType": "user.created"
}`),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/json",
Expand Down
9 changes: 9 additions & 0 deletions event/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package event

// Service represents service for managing event types.
type Service interface {
CreateEventType(eventType *Type) (*Type, error)
GetEventType(space string, name TypeName) (*Type, error)
GetEventTypes(space string) (Types, error)
DeleteEventType(space string, name TypeName) error
}
8 changes: 4 additions & 4 deletions event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// SystemEventReceivedType is a system event emitted when the Event Gateway receives an event.
const SystemEventReceivedType = Type("gateway.event.received")
const SystemEventReceivedType = TypeName("gateway.event.received")

// SystemEventReceivedData struct.
type SystemEventReceivedData struct {
Expand All @@ -15,7 +15,7 @@ type SystemEventReceivedData struct {
}

// SystemFunctionInvokingType is a system event emitted before invoking a function.
const SystemFunctionInvokingType = Type("gateway.function.invoking")
const SystemFunctionInvokingType = TypeName("gateway.function.invoking")

// SystemFunctionInvokingData struct.
type SystemFunctionInvokingData struct {
Expand All @@ -25,7 +25,7 @@ type SystemFunctionInvokingData struct {
}

// SystemFunctionInvokedType is a system event emitted after successful function invocation.
const SystemFunctionInvokedType = Type("gateway.function.invoked")
const SystemFunctionInvokedType = TypeName("gateway.function.invoked")

// SystemFunctionInvokedData struct.
type SystemFunctionInvokedData struct {
Expand All @@ -36,7 +36,7 @@ type SystemFunctionInvokedData struct {
}

// SystemFunctionInvocationFailedType is a system event emitted after successful function invocation.
const SystemFunctionInvocationFailedType = Type("gateway.function.invocationFailed")
const SystemFunctionInvocationFailedType = TypeName("gateway.function.invocationFailed")

// SystemFunctionInvocationFailedData struct.
type SystemFunctionInvocationFailedData struct {
Expand Down
28 changes: 28 additions & 0 deletions event/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package event

import "go.uber.org/zap/zapcore"

const (
// TypeHTTPRequest is a special type of event HTTP requests that are not CloudEvents.
TypeHTTPRequest = TypeName("http.request")
)

// TypeName uniquely identifies an event type.
type TypeName string

// Type is a registered event type.
type Type struct {
Space string `json:"space" validate:"required,min=3,space"`
Name TypeName `json:"name" validate:"required"`
}

// Types is an array of subscriptions.
type Types []*Type

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface
func (t Type) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("space", string(t.Space))
enc.AddString("name", string(t.Name))

return nil
}
4 changes: 3 additions & 1 deletion httpapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"time"

"github.com/julienschmidt/httprouter"
"github.com/serverless/event-gateway/event"
"github.com/serverless/event-gateway/function"
"github.com/serverless/event-gateway/subscription"
)

// StartConfigAPI creates a new configuration API server and listens for requests.
func StartConfigAPI(functions function.Service, subscriptions subscription.Service, config ServerConfig) {
func StartConfigAPI(eventtypes event.Service, functions function.Service, subscriptions subscription.Service, config ServerConfig) {
router := httprouter.New()
api := &HTTPAPI{
EventTypes: eventtypes,
Functions: functions,
Subscriptions: subscriptions,
}
Expand Down
Loading

0 comments on commit 68bcaf6

Please sign in to comment.