Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use context to add timeout to cincinnati HTTP request #410

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/cincinnati/cincinnati.go
@@ -1,12 +1,14 @@
package cincinnati

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/blang/semver/v4"
"github.com/google/uuid"
Expand All @@ -16,6 +18,9 @@ const (
// GraphMediaType is the media-type specified in the HTTP Accept header
// of requests sent to the Cincinnati-v1 Graph API.
GraphMediaType = "application/json"

// Timeout when calling upstream Cincinnati stack.
getUpdatesTimeout = time.Minute * 60
)

// Client is a Cincinnati client which can be used to fetch update graphs from
Expand Down Expand Up @@ -58,7 +63,7 @@ func (err *Error) Error() string {
// finding all of the children. These children are the available updates for
// the current version and their payloads indicate from where the actual update
// image can be downloaded.
func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) {
func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) {
transport := http.Transport{}
// Prepare parametrized cincinnati query.
queryParams := uri.Query()
Expand All @@ -83,7 +88,9 @@ func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version se
}

client := http.Client{Transport: &transport}
resp, err := client.Do(req)
timeoutCtx, cancel := context.WithTimeout(ctx, getUpdatesTimeout)
defer cancel()
resp, err := client.Do(req.WithContext(timeoutCtx))
if err != nil {
return nil, &Error{Reason: "RemoteFailed", Message: err.Error(), cause: err}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cincinnati/cincinnati_test.go
@@ -1,6 +1,7 @@
package cincinnati

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestGetUpdates(t *testing.T) {
t.Fatal(err)
}

updates, err := c.GetUpdates(uri, arch, channelName, semver.MustParse(test.version))
updates, err := c.GetUpdates(context.Background(), uri, arch, channelName, semver.MustParse(test.version))
if test.err == "" {
if err != nil {
t.Fatalf("expected nil error, got: %v", err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/cvo/availableupdates.go
@@ -1,6 +1,7 @@
package cvo

import (
"context"
"crypto/tls"
"fmt"
"net/url"
Expand All @@ -23,7 +24,7 @@ const noChannel string = "NoChannel"
// syncAvailableUpdates attempts to retrieve the latest updates and update the status of the ClusterVersion
// object. It will set the RetrievedUpdates condition. Updates are only checked if it has been more than
// the minimumUpdateCheckInterval since the last check.
func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) error {
func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1.ClusterVersion) error {
usedDefaultUpstream := false
upstream := string(config.Spec.Upstream)
if len(upstream) == 0 {
Expand All @@ -45,7 +46,7 @@ func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) erro
return err
}

updates, condition := calculateAvailableUpdatesStatus(string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion)
updates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion)

if usedDefaultUpstream {
upstream = ""
Expand Down Expand Up @@ -139,7 +140,7 @@ func (optr *Operator) getAvailableUpdates() *availableUpdates {
return optr.availableUpdates
}

func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) {
func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) {
if len(upstream) == 0 {
return nil, configv1.ClusterOperatorStatusCondition{
Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse, Reason: "NoUpstream",
Expand Down Expand Up @@ -193,7 +194,7 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon
}
}

updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(upstreamURI, arch, channel, currentVersion)
updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(ctx, upstreamURI, arch, channel, currentVersion)
if err != nil {
klog.V(2).Infof("Upstream server %s could not return available updates: %v", upstream, err)
if updateError, ok := err.(*cincinnati.Error); ok {
Expand Down
25 changes: 12 additions & 13 deletions pkg/cvo/cvo.go
Expand Up @@ -326,19 +326,19 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
optr.queue.Add(optr.queueKey())

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister)
go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh)
go wait.Until(func() {
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) {
defer close(workerStopCh)

// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) })
optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) })
if err := optr.sync(ctx, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
}, time.Second, stopCh)
}, time.Second)
if optr.signatureStore != nil {
go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2)
}
Expand Down Expand Up @@ -375,21 +375,21 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
}
}

func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error) {
func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) {
for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) {
}
}

type syncFailingStatusFunc func(ctx context.Context, config *configv1.ClusterVersion, err error) error

func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error, syncFailingStatus syncFailingStatusFunc) bool {
func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error, syncFailingStatus syncFailingStatusFunc) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)

err := syncHandler(key.(string))
err := syncHandler(ctx, key.(string))
handleErr(ctx, queue, err, key, syncFailingStatus)
return true
}
Expand Down Expand Up @@ -486,7 +486,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error {

// availableUpdatesSync is triggered on cluster version change (and periodic requeues) to
// sync available updates. It only modifies cluster version.
func (optr *Operator) availableUpdatesSync(key string) error {
func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing available updates %q (%v)", key, startTime)
defer func() {
Expand All @@ -503,13 +503,12 @@ func (optr *Operator) availableUpdatesSync(key string) error {
if errs := validation.ValidateClusterVersion(config); len(errs) > 0 {
return nil
}

return optr.syncAvailableUpdates(config)
return optr.syncAvailableUpdates(ctx, config)
}

// upgradeableSync is triggered on cluster version change (and periodic requeues) to
// sync upgradeableCondition. It only modifies cluster version.
func (optr *Operator) upgradeableSync(key string) error {
func (optr *Operator) upgradeableSync(ctx context.Context, key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing upgradeable %q (%v)", key, startTime)
defer func() {
Expand Down
8 changes: 5 additions & 3 deletions pkg/cvo/cvo_test.go
Expand Up @@ -256,7 +256,6 @@ func (c *fakeApiExtClient) Patch(ctx context.Context, name string, pt types.Patc
}

func TestOperator_sync(t *testing.T) {
ctx := context.Background()
id := uuid.Must(uuid.NewRandom()).String()

tests := []struct {
Expand Down Expand Up @@ -2271,6 +2270,7 @@ func TestOperator_sync(t *testing.T) {
}
optr.eventRecorder = record.NewFakeRecorder(100)

ctx := context.Background()
err := optr.sync(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
Expand Down Expand Up @@ -2651,7 +2651,8 @@ func TestOperator_availableUpdatesSync(t *testing.T) {
}
old := optr.availableUpdates

err := optr.availableUpdatesSync(optr.queueKey())
ctx := context.Background()
err := optr.availableUpdatesSync(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
}
Expand Down Expand Up @@ -3143,7 +3144,8 @@ func TestOperator_upgradeableSync(t *testing.T) {
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
optr.eventRecorder = record.NewFakeRecorder(100)

err := optr.upgradeableSync(optr.queueKey())
ctx := context.Background()
err := optr.upgradeableSync(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/cvo/sync_worker.go
Expand Up @@ -988,21 +988,20 @@ func ownerRefModifier(config *configv1.ClusterVersion) resourcebuilder.MetaV1Obj

// runThrottledStatusNotifier invokes fn every time ch is updated, but no more often than once
// every interval. If bucket is non-zero then the channel is throttled like a rate limiter bucket.
func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
func runThrottledStatusNotifier(ctx context.Context, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
// notify the status change function fairly infrequently to avoid updating
// the caller status more frequently than is needed
throttle := rate.NewLimiter(rate.Every(interval), bucket)
wait.Until(func() {
ctx := context.Background()
wait.UntilWithContext(ctx, func(ctx context.Context) {
var last SyncWorkerStatus
for {
select {
case <-stopCh:
case <-ctx.Done():
return
case next := <-ch:
// only throttle if we aren't on an edge
if next.Generation == last.Generation && next.Actual == last.Actual && next.Reconciling == last.Reconciling && (next.Failure != nil) == (last.Failure != nil) {
if err := throttle.Wait(ctx); err != nil {
if err := throttle.Wait(ctx); err != nil && err != context.Canceled && err != context.DeadlineExceeded {
utilruntime.HandleError(fmt.Errorf("unable to throttle status notification: %v", err))
}
}
Expand All @@ -1011,5 +1010,5 @@ func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration,
fn()
}
}
}, 1*time.Second, stopCh)
}, 1*time.Second)
}
6 changes: 3 additions & 3 deletions pkg/cvo/sync_worker_test.go
@@ -1,6 +1,7 @@
package cvo

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -146,12 +147,11 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) {
}
}
func Test_runThrottledStatusNotifier(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
in := make(chan SyncWorkerStatus)
out := make(chan struct{}, 100)

go runThrottledStatusNotifier(stopCh, 30*time.Second, 1, in, func() { out <- struct{}{} })
ctx := context.Background()
go runThrottledStatusNotifier(ctx, 30*time.Second, 1, in, func() { out <- struct{}{} })

in <- SyncWorkerStatus{Actual: configv1.Update{Image: "test"}}
select {
Expand Down