Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/telemeter-server,pkg/cache: cache auth resps #273

Merged
merged 4 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 23 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ FIRST_GOPATH:=$(firstword $(subst :, ,$(shell go env GOPATH)))
GOLANGCI_LINT_BIN=$(FIRST_GOPATH)/bin/golangci-lint
GOLANGCI_LINT_VERSION=v1.18.0
EMBEDMD_BIN=$(FIRST_GOPATH)/bin/embedmd
THANOS_BIN=$(FIRST_GOPATH)/bin/thanos
UP_BIN=$(FIRST_GOPATH)/bin/up
MEMCACHED_BIN=_output/bin/memcached
PROMETHEUS_BIN=_output/bin/prometheus
GOJSONTOYAML_BIN=$(FIRST_GOPATH)/bin/gojsontoyaml
# We need jsonnet on CI; here we default to the user's installed jsonnet binary; if nothing is installed, then install go-jsonnet.
JSONNET_BIN=$(if $(shell which jsonnet 2>/dev/null),$(shell which jsonnet 2>/dev/null),$(FIRST_GOPATH)/bin/jsonnet)
Expand Down Expand Up @@ -146,8 +150,9 @@ test-unit:
test-generate:
make --always-make && git diff --exit-code

test-integration: build
./test/integration.sh
test-integration: build $(THANOS_BIN) $(UP_BIN) $(MEMCACHED_BIN) $(PROMETHEUS_BIN)
PATH=$$PATH:$$(pwd)/_output/bin ./test/integration.sh
PATH=$$PATH:$$(pwd)/_output/bin ./test/integration-v2.sh

test-benchmark: build
./test/benchmark.sh "" "" $(BENCHMARK_GOAL) "" $(BENCHMARK_GOAL)
Expand All @@ -170,6 +175,22 @@ test/timeseries.txt:

dependencies: $(JB_BIN) $(GOLANGCI_LINT_BIN)

$(THANOS_BIN):
GO111MODULE=off go get github.com/thanos-io/thanos

$(UP_BIN):
GO111MODULE=off go get github.com/observatorium/up

$(MEMCACHED_BIN):
mkdir -p _output/bin
@echo "Downloading Memcached"
curl -L https://www.archlinux.org/packages/extra/x86_64/memcached/download/ | tar --strip-components=2 -xJf - -C _output/bin usr/bin/memcached

$(PROMETHEUS_BIN):
mkdir -p _output/bin
@echo "Downloading Prometheus"
curl -L "https://github.com/prometheus/prometheus/releases/download/v2.3.2/prometheus-2.3.2.$$(go env GOOS)-$$(go env GOARCH).tar.gz" | tar --strip-components=1 -xzf - -C _output/bin

$(EMBEDMD_BIN):
GO111MODULE=off go get -u github.com/campoy/embedmd

Expand Down
34 changes: 25 additions & 9 deletions cmd/telemeter-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

oidc "github.com/coreos/go-oidc"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
Expand All @@ -36,6 +37,8 @@ import (
"github.com/openshift/telemeter/pkg/authorize/jwt"
"github.com/openshift/telemeter/pkg/authorize/stub"
"github.com/openshift/telemeter/pkg/authorize/tollbooth"
"github.com/openshift/telemeter/pkg/cache"
"github.com/openshift/telemeter/pkg/cache/memcached"
"github.com/openshift/telemeter/pkg/cluster"
telemeter_http "github.com/openshift/telemeter/pkg/http"
httpserver "github.com/openshift/telemeter/pkg/http/server"
Expand Down Expand Up @@ -86,6 +89,7 @@ func main() {
PartitionKey: "_id",
Ratelimit: 4*time.Minute + 30*time.Second,
TTL: 10 * time.Minute,
MemcachedExpire: 24 * 60 * 60,
squat marked this conversation as resolved.
Show resolved Hide resolved
}
cmd := &cobra.Command{
Short: "Aggregate federated metrics pushes",
Expand Down Expand Up @@ -122,6 +126,8 @@ func main() {
cmd.Flags().StringVar(&opt.ClientSecret, "client-secret", opt.ClientSecret, "The OIDC client secret, see https://tools.ietf.org/html/rfc6749#section-2.3.")
cmd.Flags().StringVar(&opt.ClientID, "client-id", opt.ClientID, "The OIDC client ID, see https://tools.ietf.org/html/rfc6749#section-2.3.")
cmd.Flags().StringVar(&opt.TenantKey, "tenant-key", opt.TenantKey, "The JSON key in the bearer token whose value to use as the tenant ID.")
cmd.Flags().StringSliceVar(&opt.Memcacheds, "memcached", opt.Memcacheds, "One or more Memcached server addresses.")
cmd.Flags().Int32Var(&opt.MemcachedExpire, "memcached-expire", opt.MemcachedExpire, "Time after which keys stored in Memcached should expire, given in seconds.")

cmd.Flags().DurationVar(&opt.Ratelimit, "ratelimit", opt.Ratelimit, "The rate limit of metric uploads per cluster ID. Uploads happening more often than this limit will be rejected.")
cmd.Flags().DurationVar(&opt.TTL, "ttl", opt.TTL, "The TTL for metrics to be held in memory.")
Expand Down Expand Up @@ -174,10 +180,12 @@ type Options struct {

AuthorizeEndpoint string

OIDCIssuer string
ClientID string
ClientSecret string
TenantKey string
OIDCIssuer string
ClientID string
ClientSecret string
TenantKey string
Memcacheds []string
MemcachedExpire int32

PartitionKey string
LabelFlag []string
Expand Down Expand Up @@ -236,7 +244,7 @@ func (o *Options) Run() error {

// set up the upstream authorization
var authorizeURL *url.URL
var authorizeClient *http.Client
var authorizeClient http.Client
ctx := context.Background()
if len(o.AuthorizeEndpoint) > 0 {
u, err := url.Parse(o.AuthorizeEndpoint)
Expand All @@ -255,7 +263,7 @@ func (o *Options) Run() error {
transport = telemeter_http.NewDebugRoundTripper(o.Logger, transport)
}

authorizeClient = &http.Client{
authorizeClient = http.Client{
Timeout: 20 * time.Second,
Transport: telemeter_http.NewInstrumentedRoundTripper("authorize", transport),
}
Expand Down Expand Up @@ -389,7 +397,7 @@ func (o *Options) Run() error {
// configure the authenticator and incoming data validator
var clusterAuth authorize.ClusterAuthorizer = authorize.ClusterAuthorizerFunc(stub.Authorize)
if authorizeURL != nil {
clusterAuth = tollbooth.NewAuthorizer(o.Logger, authorizeClient, authorizeURL)
clusterAuth = tollbooth.NewAuthorizer(o.Logger, &authorizeClient, authorizeURL)
}

auth := jwt.NewAuthorizeClusterHandler(o.Logger, o.PartitionKey, o.TokenExpireSeconds, signer, o.RequiredLabels, clusterAuth)
Expand Down Expand Up @@ -501,10 +509,18 @@ func (o *Options) Run() error {
),
)

// v1 routes
// v2 routes
v2AuthorizeClient := authorizeClient

if len(o.Memcacheds) > 0 {
mc := memcached.New(o.MemcachedExpire, o.Memcacheds...)
l := log.With(o.Logger, "component", "cache")
v2AuthorizeClient.Transport = cache.NewRoundTripper(mc, tollbooth.ExtractToken, v2AuthorizeClient.Transport, l, prometheus.DefaultRegisterer)
}

external.Handle("/metrics/v1/receive",
telemeter_http.NewInstrumentedHandler("receive",
authorize.NewHandler(o.Logger, authorizeClient, authorizeURL, o.TenantKey,
authorize.NewHandler(o.Logger, &v2AuthorizeClient, authorizeURL, o.TenantKey,
http.HandlerFunc(receiver.Receive),
),
),
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/openshift/telemeter
go 1.12

require (
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/coreos/go-oidc v2.0.0+incompatible
github.com/go-kit/kit v0.8.0
github.com/go-logfmt/logfmt v0.4.0 // indirect
Expand All @@ -13,6 +14,7 @@ require (
github.com/hashicorp/memberlist v0.1.4
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/oklog/run v0.0.0-20180308005104-6934b124db28
github.com/pkg/errors v0.8.0
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/client_golang v0.9.1
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/aws/aws-sdk-go v0.0.0-20180507225419-00862f899353/go.mod h1:ZRmQr0Faj
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/biogo/store v0.0.0-20160505134755-913427a1d5e8/go.mod h1:Iev9Q3MErcn+w3UOJD/DkEzllvugfdx7bGcMOFhvr/4=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/cenk/backoff v2.0.0+incompatible/go.mod h1:7FtoeaSnHoZnmZzz47cM35Y9nSW7tNyaidugnHTaFDE=
github.com/certifi/gocertifi v0.0.0-20180905225744-ee1a9a0726d2/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
Expand Down
3 changes: 2 additions & 1 deletion jsonnet/server.jsonnet
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local t = (import 'telemeter/server.libsonnet');

{ [name]: t.telemeterServer[name] for name in std.objectFields(t.telemeterServer) }
{ [name]: t.telemeterServer[name] for name in std.objectFields(t.telemeterServer) } +
{ [name + 'Memcached']: t.memcached[name] for name in std.objectFields(t.memcached) if t.memcached.replicas > 0 }
12 changes: 11 additions & 1 deletion jsonnet/telemeter/server.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ local list = import 'lib/list.libsonnet';

(import 'server/kubernetes.libsonnet') + {
local ts = super.telemeterServer,
local m = super.memcached,
telemeterServer+:: {
list: list.asList('telemeter', ts, [])
+ list.withAuthorizeURL($._config)
+ list.withNamespace($._config)
+ list.withServerImage($._config)
+ list.withResourceRequestsAndLimits('telemeter-server', $._config.telemeterServer.resourceRequests, $._config.telemeterServer.resourceLimits)
+ list.withResourceRequestsAndLimits('telemeter-server', $._config.telemeterServer.resourceRequests, $._config.telemeterServer.resourceLimits),
},
memcached+:: {
list: list.asList('memcached', m, [
{
name: 'MEMCACHED_IMAGE',
value: m.image,
},
])
+ list.withNamespace($._config),
},
} + {
_config+:: {
Expand Down
61 changes: 60 additions & 1 deletion jsonnet/telemeter/server/kubernetes.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ local clusterPort = 8082;
$._config.telemeterServer.elideLabels
);

local memcachedReplicas = std.range(1, $.memcached.replicas);
local memcached = [
'--memcached=%s-%d.%s.%s.svc.cluster.local:%d' % [
$.memcached.statefulSet.metadata.name,
i,
$.memcached.service.metadata.name,
$.memcached.service.metadata.namespace,
$.memcached.service.spec.ports[0].port,
]
for i in memcachedReplicas
];


local telemeterServer =
container.new('telemeter-server', $._config.imageRepos.telemeterServer + ':' + $._config.versions.telemeterServer) +
container.withCommand([
Expand All @@ -79,7 +92,7 @@ local clusterPort = 8082;
'--oidc-issuer=$(OIDC_ISSUER)',
'--client-id=$(CLIENT_ID)',
'--client-secret=$(CLIENT_SECRET)',
] + whitelist + elide) +
] + memcached + whitelist + elide) +
container.withPorts([
containerPort.newNamed('external', externalPort),
containerPort.newNamed('internal', internalPort),
Expand Down Expand Up @@ -224,4 +237,50 @@ local clusterPort = 8082;
},
},
},

memcached+:: {
image:: 'docker.io/memcached:1.5.20-alpine',
replicas:: 3,

service:
local service = k.core.v1.service;
local ports = service.mixin.spec.portsType;

service.new(
'memcached',
$.memcached.statefulSet.metadata.labels,
[
ports.newNamed('memcached', 11211, 11211),
]
) +
service.mixin.metadata.withNamespace($._config.namespace) +
service.mixin.metadata.withLabels({ 'app.kubernetes.io/name': $.memcached.service.metadata.name }) +
service.mixin.spec.withClusterIp('None'),

statefulSet:
local sts = k.apps.v1beta2.statefulSet;
local volume = sts.mixin.spec.template.spec.volumesType;
local container = sts.mixin.spec.template.spec.containersType;
local containerEnv = container.envType;
local containerVolumeMount = container.volumeMountsType;

local c =
container.new($.memcached.statefulSet.metadata.name, $.memcached.image) +
container.withPorts([
{ name: 'memcached', containerPort: $.memcached.service.spec.ports[0].port },
]) +
container.mixin.resources.withRequests({ cpu: '100m', memory: '512Mi' }) +
container.mixin.resources.withLimits({ cpu: '1', memory: '1Gi' });

sts.new('memcached', $.memcached.replicas, c, [], $.memcached.statefulSet.metadata.labels) +
sts.mixin.metadata.withNamespace($._config.namespace) +
sts.mixin.metadata.withLabels({ 'app.kubernetes.io/name': $.memcached.statefulSet.metadata.name }) +
sts.mixin.spec.withServiceName($.memcached.service.metadata.name) +
sts.mixin.spec.selector.withMatchLabels($.memcached.statefulSet.metadata.labels) +
{
spec+: {
volumeClaimTemplates:: null,
},
},
},
}
12 changes: 9 additions & 3 deletions pkg/authorize/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,33 +117,39 @@ func AgainstEndpoint(logger log.Logger, client *http.Client, endpoint *url.URL,
}
}

// NewHandler returns an http.HandlerFunc that is able to authorize requests against Tollbooth.
// The handler function expects a bearer token in the Authorization header consisting of a
// base64-encoded JSON object containing "authorization_token" and "cluster_id" fields.
func NewHandler(logger log.Logger, client *http.Client, endpoint *url.URL, tenantKey string, next http.Handler) http.HandlerFunc {
logger = log.With(logger, "component", "authorize")
return func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
authParts := strings.Split(string(authHeader), " ")
if len(authParts) != 2 || strings.ToLower(authParts[0]) != "bearer" {
http.Error(w, "bad authorization header", http.StatusBadRequest)
level.Warn(logger).Log("msg", "bad authorization header")
w.WriteHeader(http.StatusBadRequest)
return
}

token, err := base64.StdEncoding.DecodeString(authParts[1])
if err != nil {
http.Error(w, "bad authorization header", http.StatusBadRequest)
level.Warn(logger).Log("msg", "failed to extract token", "err", err)
w.WriteHeader(http.StatusBadRequest)
squat marked this conversation as resolved.
Show resolved Hide resolved
return
}
var tenant string
if tenantKey != "" {
fields := make(map[string]string)
if err := json.Unmarshal(token, &fields); err != nil {
level.Warn(logger).Log("msg", "failed to read token", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
tenant = fields[tenantKey]
}
if _, err := AgainstEndpoint(logger, client, endpoint, token, tenant, nil); err != nil {
level.Warn(logger).Log("msg", "unauthorized request made:", "err", err)
http.Error(w, "unauthorized", http.StatusUnauthorized)
w.WriteHeader(http.StatusUnauthorized)
return
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/authorize/tollbooth/tollbooth.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package tollbooth

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"mime"
"net/http"
"net/url"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"

"github.com/openshift/telemeter/pkg/authorize"
)
Expand Down Expand Up @@ -75,3 +78,15 @@ func (a *authorizer) AuthorizeCluster(token, cluster string) (string, error) {

return response.AccountID, nil
}

// ExtractToken extracts the token from an auth request.
// In the case of a request to Tollbooth, the token
// is the entire contents of the request body.
func ExtractToken(r *http.Request) (string, error) {
body, err := ioutil.ReadAll(r.Body)
squat marked this conversation as resolved.
Show resolved Hide resolved
if err := r.Body.Close(); err != nil {
return "", errors.Wrap(err, "failed to close body")
}
r.Body = ioutil.NopCloser(bytes.NewBuffer(body))
return string(body), err
}