Skip to content

Commit

Permalink
Use context to add timeout to cincinnati HTTP request
Browse files Browse the repository at this point in the history
Change CVO wait.Until instances to wait.UntilWithContext making top-level
context available to the lower levels of the code. Use the added context
to add a timeout to the remote cincinnati HTTP call which has the
potential to hang the caller at the mercy of the target server.

The timeout has been set to an hour which was determined to be long
enough to allow all expected well-behaved requests to complete but short
enough to not be catastrophic if the calling thread were to hang.
  • Loading branch information
lucab authored and jottofar committed Jul 30, 2020
1 parent 4b5876e commit 1d1de3b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 32 deletions.
11 changes: 9 additions & 2 deletions pkg/cincinnati/cincinnati.go
@@ -1,12 +1,14 @@
package cincinnati

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/blang/semver/v4"
"github.com/google/uuid"
Expand All @@ -16,6 +18,9 @@ const (
// GraphMediaType is the media-type specified in the HTTP Accept header
// of requests sent to the Cincinnati-v1 Graph API.
GraphMediaType = "application/json"

// Timeout when calling upstream Cincinnati stack.
getUpdatesTimeout = time.Minute * 60
)

// Client is a Cincinnati client which can be used to fetch update graphs from
Expand Down Expand Up @@ -58,7 +63,7 @@ func (err *Error) Error() string {
// finding all of the children. These children are the available updates for
// the current version and their payloads indicate from where the actual update
// image can be downloaded.
func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) {
func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) {
transport := http.Transport{}
// Prepare parametrized cincinnati query.
queryParams := uri.Query()
Expand All @@ -83,7 +88,9 @@ func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version se
}

client := http.Client{Transport: &transport}
resp, err := client.Do(req)
timeoutCtx, cancel := context.WithTimeout(ctx, getUpdatesTimeout)
defer cancel()
resp, err := client.Do(req.WithContext(timeoutCtx))
if err != nil {
return nil, &Error{Reason: "RemoteFailed", Message: err.Error(), cause: err}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cincinnati/cincinnati_test.go
@@ -1,6 +1,7 @@
package cincinnati

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestGetUpdates(t *testing.T) {
t.Fatal(err)
}

updates, err := c.GetUpdates(uri, arch, channelName, semver.MustParse(test.version))
updates, err := c.GetUpdates(context.Background(), uri, arch, channelName, semver.MustParse(test.version))
if test.err == "" {
if err != nil {
t.Fatalf("expected nil error, got: %v", err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/cvo/availableupdates.go
@@ -1,6 +1,7 @@
package cvo

import (
"context"
"crypto/tls"
"fmt"
"net/url"
Expand All @@ -23,7 +24,7 @@ const noChannel string = "NoChannel"
// syncAvailableUpdates attempts to retrieve the latest updates and update the status of the ClusterVersion
// object. It will set the RetrievedUpdates condition. Updates are only checked if it has been more than
// the minimumUpdateCheckInterval since the last check.
func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) error {
func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1.ClusterVersion) error {
usedDefaultUpstream := false
upstream := string(config.Spec.Upstream)
if len(upstream) == 0 {
Expand All @@ -45,7 +46,7 @@ func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) erro
return err
}

updates, condition := calculateAvailableUpdatesStatus(string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion)
updates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion)

if usedDefaultUpstream {
upstream = ""
Expand Down Expand Up @@ -139,7 +140,7 @@ func (optr *Operator) getAvailableUpdates() *availableUpdates {
return optr.availableUpdates
}

func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) {
func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) {
if len(upstream) == 0 {
return nil, configv1.ClusterOperatorStatusCondition{
Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse, Reason: "NoUpstream",
Expand Down Expand Up @@ -193,7 +194,7 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon
}
}

updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(upstreamURI, arch, channel, currentVersion)
updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(ctx, upstreamURI, arch, channel, currentVersion)
if err != nil {
klog.V(2).Infof("Upstream server %s could not return available updates: %v", upstream, err)
if updateError, ok := err.(*cincinnati.Error); ok {
Expand Down
25 changes: 12 additions & 13 deletions pkg/cvo/cvo.go
Expand Up @@ -326,19 +326,19 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
optr.queue.Add(optr.queueKey())

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister)
go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh)
go wait.Until(func() {
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) {
defer close(workerStopCh)

// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) })
optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) })
if err := optr.sync(ctx, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
}, time.Second, stopCh)
}, time.Second)
if optr.signatureStore != nil {
go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2)
}
Expand Down Expand Up @@ -375,21 +375,21 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
}
}

func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error) {
func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) {
for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) {
}
}

type syncFailingStatusFunc func(ctx context.Context, config *configv1.ClusterVersion, err error) error

func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error, syncFailingStatus syncFailingStatusFunc) bool {
func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error, syncFailingStatus syncFailingStatusFunc) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)

err := syncHandler(key.(string))
err := syncHandler(ctx, key.(string))
handleErr(ctx, queue, err, key, syncFailingStatus)
return true
}
Expand Down Expand Up @@ -486,7 +486,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error {

// availableUpdatesSync is triggered on cluster version change (and periodic requeues) to
// sync available updates. It only modifies cluster version.
func (optr *Operator) availableUpdatesSync(key string) error {
func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing available updates %q (%v)", key, startTime)
defer func() {
Expand All @@ -503,13 +503,12 @@ func (optr *Operator) availableUpdatesSync(key string) error {
if errs := validation.ValidateClusterVersion(config); len(errs) > 0 {
return nil
}

return optr.syncAvailableUpdates(config)
return optr.syncAvailableUpdates(ctx, config)
}

// upgradeableSync is triggered on cluster version change (and periodic requeues) to
// sync upgradeableCondition. It only modifies cluster version.
func (optr *Operator) upgradeableSync(key string) error {
func (optr *Operator) upgradeableSync(ctx context.Context, key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing upgradeable %q (%v)", key, startTime)
defer func() {
Expand Down
8 changes: 5 additions & 3 deletions pkg/cvo/cvo_test.go
Expand Up @@ -256,7 +256,6 @@ func (c *fakeApiExtClient) Patch(ctx context.Context, name string, pt types.Patc
}

func TestOperator_sync(t *testing.T) {
ctx := context.Background()
id := uuid.Must(uuid.NewRandom()).String()

tests := []struct {
Expand Down Expand Up @@ -2271,6 +2270,7 @@ func TestOperator_sync(t *testing.T) {
}
optr.eventRecorder = record.NewFakeRecorder(100)

ctx := context.Background()
err := optr.sync(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
Expand Down Expand Up @@ -2651,7 +2651,8 @@ func TestOperator_availableUpdatesSync(t *testing.T) {
}
old := optr.availableUpdates

err := optr.availableUpdatesSync(optr.queueKey())
ctx := context.Background()
err := optr.availableUpdatesSync(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
}
Expand Down Expand Up @@ -3143,7 +3144,8 @@ func TestOperator_upgradeableSync(t *testing.T) {
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
optr.eventRecorder = record.NewFakeRecorder(100)

err := optr.upgradeableSync(optr.queueKey())
ctx := context.Background()
err := optr.upgradeableSync(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/cvo/sync_worker.go
Expand Up @@ -988,21 +988,20 @@ func ownerRefModifier(config *configv1.ClusterVersion) resourcebuilder.MetaV1Obj

// runThrottledStatusNotifier invokes fn every time ch is updated, but no more often than once
// every interval. If bucket is non-zero then the channel is throttled like a rate limiter bucket.
func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
func runThrottledStatusNotifier(ctx context.Context, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
// notify the status change function fairly infrequently to avoid updating
// the caller status more frequently than is needed
throttle := rate.NewLimiter(rate.Every(interval), bucket)
wait.Until(func() {
ctx := context.Background()
wait.UntilWithContext(ctx, func(ctx context.Context) {
var last SyncWorkerStatus
for {
select {
case <-stopCh:
case <-ctx.Done():
return
case next := <-ch:
// only throttle if we aren't on an edge
if next.Generation == last.Generation && next.Actual == last.Actual && next.Reconciling == last.Reconciling && (next.Failure != nil) == (last.Failure != nil) {
if err := throttle.Wait(ctx); err != nil {
if err := throttle.Wait(ctx); err != nil && err != context.Canceled && err != context.DeadlineExceeded {
utilruntime.HandleError(fmt.Errorf("unable to throttle status notification: %v", err))
}
}
Expand All @@ -1011,5 +1010,5 @@ func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration,
fn()
}
}
}, 1*time.Second, stopCh)
}, 1*time.Second)
}
6 changes: 3 additions & 3 deletions pkg/cvo/sync_worker_test.go
@@ -1,6 +1,7 @@
package cvo

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -146,12 +147,11 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) {
}
}
func Test_runThrottledStatusNotifier(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
in := make(chan SyncWorkerStatus)
out := make(chan struct{}, 100)

go runThrottledStatusNotifier(stopCh, 30*time.Second, 1, in, func() { out <- struct{}{} })
ctx := context.Background()
go runThrottledStatusNotifier(ctx, 30*time.Second, 1, in, func() { out <- struct{}{} })

in <- SyncWorkerStatus{Actual: configv1.Update{Image: "test"}}
select {
Expand Down

0 comments on commit 1d1de3b

Please sign in to comment.