/
handler.go
178 lines (152 loc) · 4.93 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package main
import (
"log"
"math/rand"
"net/http"
"net/http/pprof"
"net/url"
"strings"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
)
// Handler represents an HTTP handler for InfluxDB node.
// Depending on its role, it will serve many different endpoints.
type Handler struct {
Log *raft.Log
Broker *influxdb.Broker
Server *influxdb.Server
Config *Config
}
// NewHandler returns a new instance of Handler.
func NewHandler() *Handler {
return &Handler{}
}
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Debug routes.
if h.Config.Debugging.PprofEnabled && strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
case "/debug/pprof/cmdline":
pprof.Cmdline(w, r)
case "/debug/pprof/profile":
pprof.Profile(w, r)
case "/debug/pprof/symbol":
pprof.Symbol(w, r)
default:
pprof.Index(w, r)
}
return
}
// Broker raft communication endpoints. These are called and handled by brokers
// to coordinate changes to the raft log.
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
return
}
// Broker messaging endpoints. These are handled by brokers and called by data
// nodes to receive topic change and update replication status.
if strings.HasPrefix(r.URL.Path, "/messaging") {
h.serveMessaging(w, r)
return
}
// Data node endpoints. These are handled by data nodes and allow brokers and data
// nodes to transfer state, process queries, etc..
if strings.HasPrefix(r.URL.Path, "/data") {
h.serveData(w, r)
return
}
// These are public API endpoints
h.serveAPI(w, r)
}
// serveMessaging responds to broker requests
func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
// If we're running a broker, handle the broker endpoints
if h.Broker != nil {
mh := &messaging.Handler{
Broker: h.Broker.Broker,
RaftHandler: &raft.Handler{Log: h.Log},
}
mh.ServeHTTP(w, r)
return
}
// Redirect to a valid broker to handle the request
h.redirect(h.Server.BrokerURLs(), w, r)
}
// serveData responds to broker requests
func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle metadata endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
if h.Server != nil {
sh := httpd.NewClusterHandler(h.Server, h.Config.Authentication.Enabled,
h.Config.Snapshot.Enabled, h.Config.Logging.HTTPAccess, version)
sh.ServeHTTP(w, r)
return
}
t := h.Broker.Topic(influxdb.BroadcastTopicID)
if t == nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
// Redirect to a valid data URL to handle the request
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
}
// serveRaft responds to raft requests.
func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) {
if h.Log == nil && h.Server == nil {
log.Println("no broker or server configured to handle raft endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
if h.Log != nil {
rh := raft.Handler{Log: h.Log}
rh.ServeHTTP(w, r)
return
}
// Redirect to a valid broker to handle the request
h.redirect(h.Server.BrokerURLs(), w, r)
}
// serveAPI responds to data requests
func (h *Handler) serveAPI(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle data endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}
if h.Server != nil {
sh := httpd.NewAPIHandler(h.Server, h.Config.Authentication.Enabled,
h.Config.Logging.HTTPAccess, version)
sh.WriteTrace = h.Config.Logging.WriteTracing
sh.ServeHTTP(w, r)
return
}
t := h.Broker.Topic(influxdb.BroadcastTopicID)
if t == nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
// Redirect to a valid data URL to handle the request
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
}
// redirect redirects a request to URL in u. If u is an empty slice,
// a 503 is returned
func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request) {
// No valid URLs, return an error
if len(u) == 0 {
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
return
}
// TODO: Log to internal stats to track how frequently this is happening. If
// this is happening frequently, the clients are using a suboptimal endpoint
// Redirect the client to a valid data node that can handle the request
http.Redirect(w, r, u[rand.Intn(len(u))].String()+r.RequestURI, http.StatusTemporaryRedirect)
}