New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor v1 HTTP to use middlewares and http.HandlerFuncs #325
Conversation
It already passes e2e with forward.Handler. I guess we can get rid of the store.Store interface really soon
@@ -325,7 +322,12 @@ func (o *Options) Run() error { | |||
}) | |||
} | |||
{ | |||
external := http.NewServeMux() | |||
external := chi.NewRouter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: above we call the router r
and the mux internal
and here we are following a different pattern. let's keep it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the above still needs the internal to begin with. We need to refactor telemeter_http.DebugRoutes
etc in another PR and then can have consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I simply mean we should call the router r
in both cases
cmd/telemeter-server/main.go
Outdated
@@ -403,17 +405,12 @@ func (o *Options) Run() error { | |||
} | |||
|
|||
auth := jwt.NewAuthorizeClusterHandler(o.Logger, o.PartitionKey, o.TokenExpireSeconds, signer, o.RequiredLabels, clusterAuth) | |||
validator := validate.New(o.PartitionKey, o.LimitBytes, 24*time.Hour, time.Now) | |||
//validator := validate.New(o.PartitionKey, o.LimitBytes, 24*time.Hour, time.Now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we intentionally ignoring this? do we just want to delete the line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented out during refactoring for reference. The validate.New
func doesn't even exist anymore. Will remove.
pkg/server/forward.go
Outdated
} | ||
} | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
partitionKey, ok := PartitionFromContext(r.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename this partitionKey
thing. It's a relic from the partitioned
metrics type but it's a weird name IMO. I think something like ID
, key
, tenant
would be much more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had the same thought during refactoring. I would have gone for clusterID
. That's what it's ultimately is, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes exactly
} | ||
|
||
if len(timeseries) == 0 { | ||
level.Info(s.logger).Log("msg", "no time series to forward to receive endpoint") | ||
return nil | ||
level.Info(logger).Log("msg", "no time series to forward to receive endpoint") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also respond with an HTTP error here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a transient error before. We simply ignored those requests and returned 200. I don't really care, honestly.
pkg/server/ratelimited.go
Outdated
"golang.org/x/time/rate" | ||
) | ||
|
||
// ErrWriteLimitReached is an error that is returned when a cluster has send too many requests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/send/sent/
pkg/server/snappy.go
Outdated
} | ||
defer r.Body.Close() | ||
|
||
payload, _ := snappy.Decode(nil, body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we handle this error just in case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like I've forgotten 🙃
pkg/server/snappy.go
Outdated
reader = ioutil.NopCloser(bytes.NewBuffer(payload)) | ||
} | ||
|
||
r.Body = reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put this line and line 14 inside the if
block, no? otherwise it's funny following why we are changing the body when the encoding is not snappy
test/e2e/forward_test.go
Outdated
labels := map[string]string{"cluster": "test"} | ||
validator := validate.New("cluster", 0, 0, time.Now) | ||
//validator := validate.New("cluster", 0, 0, time.Now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we simply delete this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great overall @metalmatze good work so far!
/retest |
} | ||
|
||
if resp.StatusCode/100 != 2 { | ||
return fmt.Errorf("response status code is %s", resp.Status) | ||
// surfacing upstreams error to our users too | ||
http.Error(w, fmt.Errorf("response status code is %s", resp.Status).Error(), resp.StatusCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct IMO. If telemeter-server gets a 400 from the upstream then it is not the user's fault that there is an auth error, but rather an internal server error. The user should not receive a 400 for an improperly configured server. Instead the user should get a 500.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. That makes sense. Keep in mind that this is the request send from Telemeter to Thanos Receivers. I guess it still holds true then too? (There's not auth in this step)
pkg/server/instrument.go
Outdated
@@ -36,7 +36,8 @@ func init() { | |||
prometheus.MustRegister(requestDuration, requestSize, requestsTotal) | |||
} | |||
|
|||
func NewInstrumentedHandler(handlerName string, next http.Handler) http.Handler { | |||
// InstrumentedHandler is a HTTP middleware that monitors HTTP requests and responses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ... is an HTTP ...
pkg/server/ratelimited.go
Outdated
type ErrWriteLimitReached string | ||
|
||
func (e ErrWriteLimitReached) Error() string { | ||
return fmt.Sprintf("write limit reached for key %q", string(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should rename key
to id
or cluster
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write limit reached for cluster with id
? 😊
pkg/server/snappy_test.go
Outdated
t.Errorf("expected %d and got %d", http.StatusOK, resp.StatusCode) | ||
} | ||
} | ||
//{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to re-enable this test or should we remove it simply?
pkg/server/validator.go
Outdated
return p, ok | ||
} | ||
|
||
type clusterIDCtxType int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's put these definitions at the top of the file to follow convention
pkg/server/validator.go
Outdated
transforms.With(metricfamily.NewRequiredLabels(client.Labels)) | ||
transforms.With(metricfamily.TransformerFunc(metricfamily.DropEmptyFamilies)) | ||
|
||
if limitBytes > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm this is funny, shouldn't this come before the ioutil.ReadAll
on line 66?
pkg/server/validator.go
Outdated
transforms.With(metricfamily.TransformerFunc(metricfamily.DropEmptyFamilies)) | ||
|
||
if limitBytes > 0 { | ||
r.Body = reader.NewLimitReadCloser(r.Body, limitBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what effect does this have? it seems like the entire body was already read into a new buffer called body
on line 66
fakeAuthorizeHandler( | ||
Validate(metricfamily.MultiTransformer{}, time.Hour, 512*1024, time.Now, | ||
func(w http.ResponseWriter, r *http.Request) { | ||
// TODO: Make the check proper to changing timestamps? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we delete or re-enable this?
/retest |
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: metalmatze, squat The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This is entirely code that is not shipped in OpenShift, but needed for the telemetry server infrastructure. All CI green. Merging. |
Now that Telemeter doesn't have the hashring anymore, we can go ahead and get rid of the
store.Store
interface, allowing us to move to simpler architecture.With this PR we're moving to a
pkg/server
that contains multiple HTTP middlewares and http.HandlerFuncs./cc @squat @kakkoyun @krasi-georgiev @bwplotka @brancz