Skip to content

Commit

Permalink
Receive requests are required to have partitionKey label
Browse files Browse the repository at this point in the history
  • Loading branch information
metalmatze committed Dec 12, 2019
1 parent 9b1f70d commit 13e55fd
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/telemeter-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (o *Options) Run() error {
transforms.With(metricfamily.NewElide(o.ElideLabels...))

server := httpserver.New(o.Logger, store, validator, transforms, o.TTL)
receiver := receive.NewHandler(o.Logger, o.ForwardURL)
receiver := receive.NewHandler(o.Logger, o.ForwardURL, o.PartitionKey)

internalPathJSON, _ := json.MarshalIndent(Paths{Paths: internalPaths}, "", " ")
externalPathJSON, _ := json.MarshalIndent(Paths{Paths: []string{"/", "/authorize", "/upload", "/healthz", "/healthz/ready", "/metrics/v1/receive"}}, "", " ")
Expand Down
63 changes: 57 additions & 6 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@ package receive

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"github.com/openshift/telemeter/pkg/authorize"
)

const forwardTimeout = 5 * time.Second
const requestLimit = 15 * 1024 // based on historic Prometheus data with 6KB at most

// ClusterAuthorizer authorizes a cluster by its token and id, returning a subject or error
type ClusterAuthorizer interface {
Expand All @@ -20,15 +26,17 @@ type ClusterAuthorizer interface {

// Handler knows the forwardURL for all requests
type Handler struct {
ForwardURL string
client *http.Client
logger log.Logger
ForwardURL string
PartitionKey string
client *http.Client
logger log.Logger
}

// NewHandler returns a new Handler with a http client
func NewHandler(logger log.Logger, forwardURL string) *Handler {
func NewHandler(logger log.Logger, forwardURL string, partitionKey string) *Handler {
return &Handler{
ForwardURL: forwardURL,
ForwardURL: forwardURL,
PartitionKey: partitionKey,
client: &http.Client{
Timeout: forwardTimeout,
},
Expand All @@ -43,7 +51,20 @@ func (h *Handler) Receive(w http.ResponseWriter, r *http.Request) {
return
}

defer r.Body.Close()
// Limit the request body size to a sane default
r.Body = http.MaxBytesReader(w, r.Body, requestLimit)

err := validateLabels(r, h.PartitionKey)
if err != nil {
level.Error(h.logger).Log("msg", "failed to validate labels in request", "err", err)

if err == ErrRequiredLabelMissing {
http.Error(w, err.Error(), http.StatusBadRequest)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}

ctx, cancel := context.WithTimeout(r.Context(), forwardTimeout)
defer cancel()
Expand Down Expand Up @@ -71,3 +92,33 @@ func (h *Handler) Receive(w http.ResponseWriter, r *http.Request) {
return
}
}

var ErrRequiredLabelMissing = fmt.Errorf("a required label is missing from the metric")

// TODO: Make this a middleware eventually
func validateLabels(r *http.Request, partitionKey string) error {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}

reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return err
}

var wreq prompb.WriteRequest
if err = proto.Unmarshal(reqBuf, &wreq); err != nil {
return err
}

for _, ts := range wreq.GetTimeseries() {
for _, l := range ts.Labels {
if l.Name == partitionKey {
return nil
}
}
}

return ErrRequiredLabelMissing
}

0 comments on commit 13e55fd

Please sign in to comment.