Skip to content

Commit

Permalink
Merge pull request #339 from metalmatze/forward-oauth2
Browse files Browse the repository at this point in the history
Pass oauth2.Transport into forward HTTP client
  • Loading branch information
openshift-merge-robot committed Jun 11, 2020
2 parents ca9c4f2 + 3304695 commit e801ab5
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 36 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ $(UP_BIN): $(BIN_DIR)

$(MEMCACHED_BIN): $(BIN_DIR) $(LIB_DIR)
@echo "Downloading Memcached"
curl -L https://www.archlinux.org/packages/core/x86_64/zstd/download/ | tar --strip-components=2 -xJf - -C $(LIB_DIR) usr/bin/zstd
curl -L https://www.archlinux.org/packages/core/x86_64/libevent/download/ | tar -I $(LIB_DIR)/zstd --strip-components=2 -xf - -C $(LIB_DIR) usr/lib
curl -L https://www.archlinux.org/packages/extra/x86_64/memcached/download/ | tar -I $(LIB_DIR)/zstd --strip-components=2 -xf - -C $(BIN_DIR) usr/bin/memcached

Expand Down
Binary file added _output/lib/zstd
Binary file not shown.
71 changes: 40 additions & 31 deletions cmd/telemeter-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ func (o *Options) Run() error {
o.RequiredLabels[values[0]] = values[1]
}

var transport http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{Timeout: 10 * time.Second}).Dial,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
}

if o.Verbose {
transport = telemeter_http.NewDebugRoundTripper(o.Logger, transport)
}

// set up the upstream authorization
var authorizeURL *url.URL
var authorizeClient http.Client
Expand All @@ -228,44 +238,43 @@ func (o *Options) Run() error {
}
authorizeURL = u

var transport http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{Timeout: 10 * time.Second}).Dial,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
}

if o.Verbose {
transport = telemeter_http.NewDebugRoundTripper(o.Logger, transport)
}

authorizeClient = http.Client{
Timeout: 20 * time.Second,
Transport: telemeter_http.NewInstrumentedRoundTripper("authorize", transport),
}
}

if o.OIDCIssuer != "" {
provider, err := oidc.NewProvider(ctx, o.OIDCIssuer)
if err != nil {
return fmt.Errorf("OIDC provider initialization failed: %v", err)
}
forwardClient := http.Client{
Timeout: 5 * time.Second,
Transport: telemeter_http.NewInstrumentedRoundTripper("forward", transport),
}

ctx = context.WithValue(ctx, oauth2.HTTPClient,
&http.Client{
Timeout: 20 * time.Second,
Transport: telemeter_http.NewInstrumentedRoundTripper("oauth", transport),
},
)
if o.OIDCIssuer != "" {
provider, err := oidc.NewProvider(ctx, o.OIDCIssuer)
if err != nil {
return fmt.Errorf("OIDC provider initialization failed: %v", err)
}

cfg := clientcredentials.Config{
ClientID: o.ClientID,
ClientSecret: o.ClientSecret,
TokenURL: provider.Endpoint().TokenURL,
}
ctx = context.WithValue(ctx, oauth2.HTTPClient,
&http.Client{
Timeout: 20 * time.Second,
Transport: telemeter_http.NewInstrumentedRoundTripper("oauth", transport),
},
)

cfg := clientcredentials.Config{
ClientID: o.ClientID,
ClientSecret: o.ClientSecret,
TokenURL: provider.Endpoint().TokenURL,
}

authorizeClient.Transport = &oauth2.Transport{
Base: authorizeClient.Transport,
Source: cfg.TokenSource(ctx),
}
authorizeClient.Transport = &oauth2.Transport{
Base: authorizeClient.Transport,
Source: cfg.TokenSource(ctx),
}
forwardClient.Transport = &oauth2.Transport{
Base: forwardClient.Transport,
Source: cfg.TokenSource(ctx),
}
}

Expand Down Expand Up @@ -435,7 +444,7 @@ func (o *Options) Run() error {
server.Ratelimit(o.Logger, o.Ratelimit, time.Now,
server.Snappy(
server.Validate(o.Logger, transforms, 24*time.Hour, o.LimitBytes, time.Now,
server.ForwardHandler(o.Logger, forwardURL, o.TenantID),
server.ForwardHandler(o.Logger, forwardURL, o.TenantID, forwardClient),
),
),
),
Expand Down
4 changes: 1 addition & 3 deletions pkg/server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ func init() {

// ForwardHandler gets a request containing metric families and
// converts it to a remote write request forwarding it to the upstream at fowardURL.
func ForwardHandler(logger log.Logger, forwardURL *url.URL, tenantID string) http.HandlerFunc {
client := http.Client{}

func ForwardHandler(logger log.Logger, forwardURL *url.URL, tenantID string, client http.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
rlogger := log.With(logger, "request", middleware.GetReqID(r.Context()))

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestForward(t *testing.T) {
server.Ratelimit(log.NewNopLogger(), 4*time.Minute+30*time.Second, time.Now,
server.Snappy(
server.Validate(log.NewNopLogger(), metricfamily.MultiTransformer{}, 10*365*24*time.Hour, 500*1024, time.Now,
server.ForwardHandler(logger, receiveURL, "default-tenant"),
server.ForwardHandler(logger, receiveURL, "default-tenant", http.Client{}),
),
),
),
Expand Down

0 comments on commit e801ab5

Please sign in to comment.