This repository has been archived by the owner on Nov 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 58
/
handlers.go
181 lines (153 loc) · 6.17 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
///////////////////////////////////////////////////////////////////////
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
///////////////////////////////////////////////////////////////////////
package eventmanager
import (
"fmt"
"net/http"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
log "github.com/sirupsen/logrus"
"github.com/vmware/dispatch/pkg/api/v1"
"github.com/vmware/dispatch/pkg/client"
"github.com/vmware/dispatch/pkg/controller"
"github.com/vmware/dispatch/pkg/entity-store"
"github.com/vmware/dispatch/pkg/event-manager/drivers"
"github.com/vmware/dispatch/pkg/event-manager/drivers/entities"
"github.com/vmware/dispatch/pkg/event-manager/gen/restapi/operations"
eventsapi "github.com/vmware/dispatch/pkg/event-manager/gen/restapi/operations/events"
"github.com/vmware/dispatch/pkg/event-manager/helpers"
"github.com/vmware/dispatch/pkg/event-manager/subscriptions"
"github.com/vmware/dispatch/pkg/events"
"github.com/vmware/dispatch/pkg/events/validator"
"github.com/vmware/dispatch/pkg/trace"
)
// Handlers is a base struct for event manager API handlers.
type Handlers struct {
Store entitystore.EntityStore
Transport events.Transport
Watcher controller.Watcher
SecretsClient client.SecretsClient
subscriptions *subscriptions.Handlers
drivers *drivers.Handlers
}
// ConfigureHandlers registers the function manager handlers to the API
func (h *Handlers) ConfigureHandlers(api middleware.RoutableAPI) {
a, ok := api.(*operations.EventManagerAPI)
if !ok {
panic("Cannot configure api")
}
a.CookieAuth = func(token string) (interface{}, error) {
// TODO: be able to retrieve user information from the cookie
// currently just return the cookie
return token, nil
}
a.BearerAuth = func(token string) (interface{}, error) {
// TODO: Once IAM issues signed tokens, validate them here.
return token, nil
}
a.Logger = log.Printf
h.subscriptions = subscriptions.NewHandlers(h.Store, h.Watcher)
h.subscriptions.ConfigureHandlers(api)
h.drivers = drivers.NewHandlers(h.Store, h.Watcher, h.SecretsClient)
h.drivers.ConfigureHandlers(api)
a.EventsEmitEventHandler = eventsapi.EmitEventHandlerFunc(h.emitEvent)
a.EventsIngestEventHandler = eventsapi.IngestEventHandlerFunc(h.ingestEvent)
}
func (h *Handlers) emitEvent(params eventsapi.EmitEventParams, principal interface{}) middleware.Responder {
span, ctx := trace.Trace(params.HTTPRequest.Context(), "emitEvent")
defer span.Finish()
if err := params.Body.Validate(strfmt.Default); err != nil {
errMsg := fmt.Sprintf("Error validating event: %s", err)
span.LogKV("validation_error", errMsg)
return eventsapi.NewEmitEventBadRequest().WithPayload(&v1.Error{
Code: http.StatusBadRequest,
Message: swag.String(errMsg),
})
}
ev := helpers.CloudEventFromAPI(¶ms.Body.CloudEvent)
if err := validator.Validate(ev); err != nil {
errMsg := fmt.Sprintf("Error validating event: %s", err)
span.LogKV("validation_error", errMsg)
return eventsapi.NewEmitEventBadRequest().WithPayload(&v1.Error{
Code: http.StatusBadRequest,
Message: swag.String(errMsg),
})
}
err := h.Transport.Publish(ctx, ev, ev.DefaultTopic(), params.XDispatchOrg)
if err != nil {
errMsg := fmt.Sprintf("error when publishing a message to MQ: %+v", err)
log.Error(errMsg)
span.LogKV("error", errMsg)
return eventsapi.NewEmitEventDefault(500).WithPayload(&v1.Error{
Code: http.StatusInternalServerError,
Message: swag.String("internal server error when emitting an event"),
})
}
// TODO: Store emission in time series database
return eventsapi.NewEmitEventOK().WithPayload(params.Body)
}
func (h *Handlers) ingestEvent(params eventsapi.IngestEventParams, principal interface{}) middleware.Responder {
span, ctx := trace.Trace(params.HTTPRequest.Context(), "ingestEvent")
defer span.Finish()
if err := params.Body.Validate(strfmt.Default); err != nil {
errMsg := fmt.Sprintf("Error validating event: %s", err)
span.LogKV("validation_error", errMsg)
return eventsapi.NewEmitEventBadRequest().WithPayload(&v1.Error{
Code: http.StatusBadRequest,
Message: swag.String(errMsg),
})
}
// auth token is expected to match event driver UUID
driverID := params.AuthToken
var driverEntities []*entities.Driver
filter := entitystore.FilterEverything()
filter.Add(entitystore.FilterStat{
Scope: entitystore.FilterScopeField,
Subject: "ID",
Verb: entitystore.FilterVerbEqual,
Object: driverID,
})
opts := entitystore.Options{Filter: filter}
// TODO(karols): every event causes DB query, only to retrieve organization ID. This asks for caching.
err := h.Store.ListGlobal(ctx, opts, &driverEntities)
if err != nil {
log.Errorf("error retrieving driverEntities for ID %s: %+v", driverID, err)
return eventsapi.NewIngestEventDefault(http.StatusInternalServerError).WithPayload(
&v1.Error{
Code: http.StatusInternalServerError,
Message: swag.String("internal server error when processing request"),
})
}
if len(driverEntities) != 1 {
log.Errorf("did not find driver for token %s: %+v", driverID, err)
return eventsapi.NewIngestEventUnauthorized().WithPayload(
&v1.Error{
Code: http.StatusUnauthorized,
Message: swag.String("token not recognized"),
})
}
ev := helpers.CloudEventFromAPI(¶ms.Body.CloudEvent)
if err := validator.Validate(ev); err != nil {
errMsg := fmt.Sprintf("Error validating event: %s", err)
span.LogKV("validation_error", errMsg)
return eventsapi.NewEmitEventBadRequest().WithPayload(&v1.Error{
Code: http.StatusBadRequest,
Message: swag.String(errMsg),
})
}
// We found driver matching the auth token. We will use it to send event.
err = h.Transport.Publish(ctx, ev, ev.DefaultTopic(), driverEntities[0].OrganizationID)
if err != nil {
errMsg := fmt.Sprintf("error when publishing a message to MQ: %+v", err)
log.Error(errMsg)
span.LogKV("error", errMsg)
return eventsapi.NewEmitEventDefault(500).WithPayload(&v1.Error{
Code: http.StatusInternalServerError,
Message: swag.String("internal server error when emitting an event"),
})
}
return eventsapi.NewIngestEventOK().WithPayload(params.Body)
}