This repository has been archived by the owner on Nov 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 58
/
handlers.go
140 lines (121 loc) · 5.88 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
///////////////////////////////////////////////////////////////////////
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
///////////////////////////////////////////////////////////////////////
package eventmanager
import (
"fmt"
"net/http"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
log "github.com/sirupsen/logrus"
"github.com/vmware/dispatch/pkg/client"
"github.com/vmware/dispatch/pkg/api/v1"
"github.com/vmware/dispatch/pkg/controller"
"github.com/vmware/dispatch/pkg/entity-store"
"github.com/vmware/dispatch/pkg/event-manager/drivers"
"github.com/vmware/dispatch/pkg/event-manager/gen/restapi/operations"
eventsapi "github.com/vmware/dispatch/pkg/event-manager/gen/restapi/operations/events"
"github.com/vmware/dispatch/pkg/event-manager/helpers"
"github.com/vmware/dispatch/pkg/event-manager/subscriptions"
"github.com/vmware/dispatch/pkg/events"
"github.com/vmware/dispatch/pkg/events/validator"
"github.com/vmware/dispatch/pkg/trace"
)
// Flags are configuration flags for the event manager
var Flags = struct {
Config string `long:"config" description:"Path to Config file" default:"./config.dev.json"`
DbFile string `long:"db-file" description:"Backend DB URL/Path" default:"./db.bolt"`
DbBackend string `long:"db-backend" description:"Backend DB Name" default:"boltdb"`
DbUser string `long:"db-username" description:"Backend DB Username" default:"dispatch"`
DbPassword string `long:"db-password" description:"Backend DB Password" default:"dispatch"`
DbDatabase string `long:"db-database" description:"Backend DB Name" default:"dispatch"`
FunctionManager string `long:"function-manager" description:"Function manager endpoint" default:"localhost:8001"`
Transport string `long:"transport" description:"Event transport to use" default:"kafka"`
KafkaBrokers []string `long:"kafka-broker" description:"host:port of Kafka broker(s)" default:"localhost:9092"`
RabbitMQURL string `long:"rabbitmq-url" description:"URL to RabbitMQ broker" default:"amqp://guest:guest@localhost:5672/"`
ResyncPeriod int `long:"resync-period" description:"The time period (in seconds) to sync with underlying k8s" default:"60"`
K8sConfig string `long:"kubeconfig" description:"Path to kubernetes config file" default:""`
K8sNamespace string `long:"namespace" description:"Kubernetes namespace" default:"default"`
EventDriverImage string `long:"event-driver-image" description:"Default event driver image"`
EventSidecarImage string `long:"event-sidecar-image" description:"Event sidecar image"`
SecretStore string `long:"secret-store" description:"Secret store endpoint" default:"localhost:8003"`
Tracer string `long:"tracer" description:"Open Tracing Tracer endpoint" default:""`
IngressHost string `long:"ingress-host" description:"Dispatch ingress hostname" default:""`
}{}
// Handlers is a base struct for event manager API handlers.
type Handlers struct {
Store entitystore.EntityStore
Transport events.Transport
Watcher controller.Watcher
SecretsClient client.SecretsClient
subscriptions *subscriptions.Handlers
drivers *drivers.Handlers
}
// ConfigureHandlers registers the function manager handlers to the API
func (h *Handlers) ConfigureHandlers(api middleware.RoutableAPI) {
a, ok := api.(*operations.EventManagerAPI)
if !ok {
panic("Cannot configure api")
}
a.CookieAuth = func(token string) (interface{}, error) {
// TODO: be able to retrieve user information from the cookie
// currently just return the cookie
log.Printf("cookie auth: %s\n", token)
return token, nil
}
a.BearerAuth = func(token string) (interface{}, error) {
// TODO: Once IAM issues signed tokens, validate them here.
log.Printf("bearer auth: %s\n", token)
return token, nil
}
a.Logger = log.Printf
h.subscriptions = subscriptions.NewHandlers(h.Store, h.Watcher)
h.subscriptions.ConfigureHandlers(api)
h.drivers = drivers.NewHandlers(h.Store, h.Watcher, h.SecretsClient, drivers.ConfigOpts{
DriverImage: Flags.EventDriverImage,
SidecarImage: Flags.EventSidecarImage,
TransportType: Flags.Transport,
RabbitMQURL: Flags.RabbitMQURL,
KafkaBrokers: Flags.KafkaBrokers,
Tracer: Flags.Tracer,
K8sConfig: Flags.K8sConfig,
DriverNamespace: Flags.K8sNamespace,
})
h.drivers.ConfigureHandlers(api)
a.EventsEmitEventHandler = eventsapi.EmitEventHandlerFunc(h.emitEvent)
}
func (h *Handlers) emitEvent(params eventsapi.EmitEventParams, principal interface{}) middleware.Responder {
span, ctx := trace.Trace(params.HTTPRequest.Context(), "emitEvent")
defer span.Finish()
if err := params.Body.Validate(strfmt.Default); err != nil {
errMsg := fmt.Sprintf("Error validating event: %s", err)
span.LogKV("validation_error", errMsg)
return eventsapi.NewEmitEventBadRequest().WithPayload(&v1.Error{
Code: http.StatusBadRequest,
Message: swag.String(errMsg),
})
}
ev := helpers.CloudEventFromAPI(¶ms.Body.CloudEvent)
if err := validator.Validate(ev); err != nil {
errMsg := fmt.Sprintf("Error validating event: %s", err)
span.LogKV("validation_error", errMsg)
return eventsapi.NewEmitEventBadRequest().WithPayload(&v1.Error{
Code: http.StatusBadRequest,
Message: swag.String(errMsg),
})
}
err := h.Transport.Publish(ctx, ev, ev.DefaultTopic(), params.XDispatchOrg)
if err != nil {
errMsg := fmt.Sprintf("error when publishing a message to MQ: %+v", err)
log.Error(errMsg)
span.LogKV("error", errMsg)
return eventsapi.NewEmitEventDefault(500).WithPayload(&v1.Error{
Code: http.StatusInternalServerError,
Message: swag.String("internal server error when emitting an event"),
})
}
// TODO: Store emission in time series database
return eventsapi.NewEmitEventOK().WithPayload(params.Body)
}