This repository has been archived by the owner on Mar 8, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
130 lines (108 loc) · 3.59 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package moroncloudevents
import (
"context"
"fmt"
"net/http"
"strconv"
cloudevents "github.com/cloudevents/sdk-go"
cloudeventsclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cloudeventshttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
)
// ServerConfig is a struct for options when constructing a new HTTP server.
type ServerConfig struct {
// Port is the port that serves both HTTP handlers and CloudEvent receiving.
// Defaults to 80.
// +optional
Port string
// CloudEventReceivePath is the path reserved for CloudEvents.
// If omitted, defaults to "/".
// +optional
CloudEventReceivePath string
// CloudEventTargets is a slice of targets that the client will send CloudEvents on.
// +optional
CloudEventTargets []string
// ConvertFn is a function to convert non-CloudEvent requests to the CloudEventReceivePath into CloudEvents.
// +optional
ConvertFn cloudevents.ConvertFn
// TransportOptions are forwarded directly to CloudEvent transport construction.
// +optional
TransportOptions []cloudeventshttp.Option
// ClientOptions are forwarded directly to CloudEvent client construction.
// +optional
ClientOptions []cloudeventsclient.Option
}
func (conf *ServerConfig) setDefaults() {
if conf.Port == "" {
conf.Port = "80"
}
if conf.CloudEventReceivePath == "" {
conf.CloudEventReceivePath = "/"
}
}
// Server allows you to simply serve HTTP handlers and a CloudEvent receiver side-by-side.
type Server struct {
*http.ServeMux
cetransport *cloudevents.HTTPTransport
ceclient cloudevents.Client
cehandler cloudeventsclient.ReceiveFull
shutdown context.CancelFunc
}
func NewServer(conf *ServerConfig) (*Server, error) {
conf.setDefaults()
portint, err := strconv.Atoi(conf.Port)
if err != nil {
return nil, fmt.Errorf("invalid port: %v", err)
}
tOpts := append(conf.TransportOptions, []cloudeventshttp.Option{
cloudevents.WithPath(conf.CloudEventReceivePath),
cloudevents.WithPort(portint),
}...)
for _, target := range conf.CloudEventTargets {
tOpts = append(tOpts, cloudevents.WithTarget(target))
}
transport, err := cloudevents.NewHTTPTransport(tOpts...)
if err != nil {
return nil, fmt.Errorf("failed to construct transport: %v", err)
}
transport.Client = &http.Client{
Transport: &ochttp.Transport{
Propagation: &b3.HTTPFormat{},
},
}
transport.Handler = http.NewServeMux()
cOps := append(conf.ClientOptions, []cloudeventsclient.Option{
cloudevents.WithUUIDs(),
cloudevents.WithTimeNow(),
}...)
if conf.ConvertFn != nil {
cOps = append(cOps, cloudevents.WithConverterFn(conf.ConvertFn))
}
client, err := cloudevents.NewClient(transport, cOps...)
return &Server{
ServeMux: transport.Handler,
cetransport: transport,
ceclient: client,
}, nil
}
// HandleCloudEvents sets the handler for CloudEvent receiveing. There can only be one.
func (s *Server) HandleCloudEvents(handler cloudeventsclient.ReceiveFull) {
s.cehandler = handler
}
// CloudEventClient returns the Server's client for CloudEvents.
func (s *Server) CloudEventClient() cloudevents.Client {
return s.ceclient
}
// Shutdown will call the cancel function for the server if it is already listening and serving.
func (s *Server) Shutdown() {
if s.shutdown != nil {
s.shutdown()
}
}
// ListenAndServe starts serving the HTTP handlers and CloudEvent receiver, blocking until termination.
func (s *Server) ListenAndServe() error {
ctx, shutdown := context.WithCancel(context.Background())
s.shutdown = shutdown
return s.ceclient.StartReceiver(ctx, s.cehandler)
}