Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive requests are required to have partitionKey label #279

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/telemeter-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ func main() {
cmd.Flags().StringVar(&opt.LogLevel, "log-level", opt.LogLevel, "Log filtering level. e.g info, debug, warn, error")

l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
lvl, err := cmd.Flags().GetString("log-level")
if err != nil {
level.Error(l).Log("msg", "could not parse log-level.")
}
l = level.NewFilter(l, logger.LogLevelFromString(lvl))
l = level.NewFilter(l, logger.LogLevelFromString(opt.LogLevel))
l = log.WithPrefix(l, "ts", log.DefaultTimestampUTC)
l = log.WithPrefix(l, "caller", log.DefaultCaller)
stdlog.SetOutput(log.NewStdlibAdapter(l))
Expand Down Expand Up @@ -432,7 +428,13 @@ func (o *Options) Run() error {
external.Handle("/metrics/v1/receive",
telemeter_http.NewInstrumentedHandler("receive",
authorize.NewHandler(o.Logger, &v2AuthorizeClient, authorizeURL, o.TenantKey,
http.HandlerFunc(receiver.Receive),
receive.LimitBodySize(receive.DefaultRequestLimit,
receive.ValidateLabels(
o.Logger,
http.HandlerFunc(receiver.Receive),
o.PartitionKey, // TODO: Enforce the same labels for v1 and v2
),
),
),
),
)
Expand Down
97 changes: 96 additions & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package receive

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"

"github.com/openshift/telemeter/pkg/authorize"
)

const forwardTimeout = 5 * time.Second

// DefaultRequestLimit is the size limit of a request body coming in
const DefaultRequestLimit = 15 * 1024 // based on historic Prometheus data with 6KB at most

// ClusterAuthorizer authorizes a cluster by its token and id, returning a subject or error
type ClusterAuthorizer interface {
AuthorizeCluster(token, cluster string) (subject string, err error)
Expand Down Expand Up @@ -58,7 +67,6 @@ func (h *Handler) Receive(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

defer r.Body.Close()
metalmatze marked this conversation as resolved.
Show resolved Hide resolved

ctx, cancel := context.WithTimeout(r.Context(), forwardTimeout)
Expand Down Expand Up @@ -91,3 +99,90 @@ func (h *Handler) Receive(w http.ResponseWriter, r *http.Request) {
h.forwardRequestsTotal.WithLabelValues("success").Inc()
w.WriteHeader(resp.StatusCode)
}

// LimitBodySize is a middleware that check that the request body is not bigger than the limit
func LimitBodySize(limit int64, next http.Handler) http.HandlerFunc {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a short comment here

return func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer r.Body.Close()

// Set body to this buffer for other handlers to read
r.Body = ioutil.NopCloser(bytes.NewBuffer(body))

if len(body) >= int(limit) {
http.Error(w, "request too big", http.StatusRequestEntityTooLarge)
return
}

next.ServeHTTP(w, r)
}
}

// ErrRequiredLabelMissing is returned if a required label is missing from a metric
var ErrRequiredLabelMissing = fmt.Errorf("a required label is missing from the metric")

// ValidateLabels by checking each enforced label to be present in every time series
func ValidateLabels(logger log.Logger, next http.Handler, labels ...string) http.HandlerFunc {
logger = log.With(logger, "component", "receive", "middleware", "validateLabels")

labelmap := make(map[string]struct{})
for _, label := range labels {
labelmap[label] = struct{}{}
}

return func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
level.Error(logger).Log("msg", "failed to read body", "err", err)
http.Error(w, "failed to read body", http.StatusInternalServerError)
return
}
defer r.Body.Close()

// Set body to this buffer for other handlers to read
r.Body = ioutil.NopCloser(bytes.NewBuffer(body))

content, err := snappy.Decode(nil, body)
if err != nil {
level.Warn(logger).Log("msg", "failed to decode request body", "err", err)
http.Error(w, "failed to decode request body", http.StatusBadRequest)
return
}

var wreq prompb.WriteRequest
if err := proto.Unmarshal(content, &wreq); err != nil {
level.Warn(logger).Log("msg", "failed to decode protobuf from body", "err", err)
http.Error(w, "failed to decode protobuf from body", http.StatusBadRequest)
return
}

for _, ts := range wreq.GetTimeseries() {
// exit early if not enough labels anyway
if len(ts.GetLabels()) < len(labels) {
level.Warn(logger).Log("msg", "request is missing required labels", "err", ErrRequiredLabelMissing)
http.Error(w, ErrRequiredLabelMissing.Error(), http.StatusBadRequest)
return
}

found := 0

for _, l := range ts.GetLabels() {
if _, ok := labelmap[l.GetName()]; ok {
found++
}
}

if len(labels) != found {
level.Warn(logger).Log("msg", "request is missing required labels", "err", ErrRequiredLabelMissing)
http.Error(w, ErrRequiredLabelMissing.Error(), http.StatusBadRequest)
return
}
}

next.ServeHTTP(w, r)
}
}
2 changes: 2 additions & 0 deletions test/e2e/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e

import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -137,6 +138,7 @@ func readMetrics(m string) []*clientmodel.MetricFamily {

func fakeAuthorizeHandler(h http.Handler, client *authorize.Client) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(context.WithValue(req.Context(), authorize.TenantKey, client.ID))
req = req.WithContext(authorize.WithClient(req.Context(), client))
h.ServeHTTP(w, req)
})
Expand Down
204 changes: 204 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package e2e

import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/go-kit/kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/openshift/telemeter/pkg/authorize"
"github.com/openshift/telemeter/pkg/receive"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"
)

func TestReceiveValidateLabels(t *testing.T) {
testcases := []struct {
Name string
Timeseries []prompb.TimeSeries
ExpectStatusCode int
}{
{
Name: "NoLabels",
Timeseries: []prompb.TimeSeries{{
Labels: []prompb.Label{},
}},
ExpectStatusCode: http.StatusBadRequest,
},
{
Name: "MissingRequiredLabel",
Timeseries: []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "foo", Value: "bar"}},
}},
ExpectStatusCode: http.StatusBadRequest,
},
{
Name: "Valid",
Timeseries: []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "foo"}},
}},
ExpectStatusCode: http.StatusOK,
},
{
Name: "MultipleMissingRequiredLabel",
Timeseries: []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "foo", Value: "bar"}},
}, {
Labels: []prompb.Label{{Name: "bar", Value: "baz"}},
}},
ExpectStatusCode: http.StatusBadRequest,
},
{
Name: "OneMultipleMissingRequiredLabel",
Timeseries: []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "foo", Value: "bar"}},
}, {
Labels: []prompb.Label{{Name: "__name__", Value: "foo"}},
}},
ExpectStatusCode: http.StatusBadRequest,
},
{
Name: "MultipleValid",
Timeseries: []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "foo"}},
}, {
Labels: []prompb.Label{{Name: "__name__", Value: "bar"}},
}},
ExpectStatusCode: http.StatusOK,
},
}

var receiveServer *httptest.Server
{
receiveServer = httptest.NewServer(func() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}())
defer receiveServer.Close()
}

var telemeterServer *httptest.Server
{
receiver := receive.NewHandler(log.NewNopLogger(), receiveServer.URL, prometheus.NewRegistry())

telemeterServer = httptest.NewServer(
fakeAuthorizeHandler(
receive.ValidateLabels(
log.NewNopLogger(),
http.HandlerFunc(receiver.Receive),
"__name__",
),
&authorize.Client{ID: "test"},
),
)

defer telemeterServer.Close()
}

for _, tc := range testcases {
t.Run(tc.Name, func(t *testing.T) {
wreq := &prompb.WriteRequest{Timeseries: tc.Timeseries}
data, err := proto.Marshal(wreq)
if err != nil {
t.Error("failed to marshal proto message")
}
compressed := snappy.Encode(nil, data)

resp, err := http.Post(telemeterServer.URL+"/metrics/v1/receive", "", bytes.NewBuffer(compressed))
if err != nil {
t.Error("failed to send the receive request: %w", err)
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != tc.ExpectStatusCode {
t.Errorf("request did not return %d, but %s: %s", tc.ExpectStatusCode, resp.Status, string(body))
}
})
}
}

func TestLimitBodySize(t *testing.T) {
var receiveServer *httptest.Server
{
receiveServer = httptest.NewServer(func() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}())
defer receiveServer.Close()
}

var telemeterServer *httptest.Server
{
receiver := receive.NewHandler(log.NewNopLogger(), receiveServer.URL, prometheus.NewRegistry())

telemeterServer = httptest.NewServer(
fakeAuthorizeHandler(
receive.LimitBodySize(receive.DefaultRequestLimit,
http.HandlerFunc(receiver.Receive),
),
&authorize.Client{ID: "test"},
),
)
defer telemeterServer.Close()
}

// Test if request within limit is fine
{
ts := []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "foo"}},
}}

wreq := &prompb.WriteRequest{Timeseries: ts}
data, err := proto.Marshal(wreq)
if err != nil {
t.Error("failed to marshal proto message")
}
compressed := snappy.Encode(nil, data)

resp, err := http.Post(telemeterServer.URL+"/metrics/v1/receive", "", bytes.NewBuffer(compressed))
if err != nil {
t.Error("failed to send the receive request: %w", err)
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Errorf("request did not return 200, but %s: %s", resp.Status, string(body))
}
}
// Test if too large request is rejected
{
var timeSeries []prompb.TimeSeries
for i := 0; i < 1000; i++ {
ts := prompb.TimeSeries{Labels: []prompb.Label{{Name: "__name__", Value: "foo" + string(i)}}}
for j := 0; j < i; j++ {
ts.Samples = append(ts.Samples, prompb.Sample{
Value: float64(j),
Timestamp: int64(j),
})
}
timeSeries = append(timeSeries, ts)
}

wreq := &prompb.WriteRequest{Timeseries: timeSeries}
data, err := proto.Marshal(wreq)
if err != nil {
t.Error("failed to marshal proto message")
}
compressed := snappy.Encode(nil, data)

resp, err := http.Post(telemeterServer.URL+"/metrics/v1/receive", "", bytes.NewBuffer(compressed))
if err != nil {
t.Error("failed to send the receive request: %w", err)
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != http.StatusRequestEntityTooLarge {
t.Errorf("request did not return 413, but %s: %s", resp.Status, string(body))
}
}
}