diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml
index 0b351176338..dde605d643e 100644
--- a/config/core/configmaps/features.yaml
+++ b/config/core/configmaps/features.yaml
@@ -28,6 +28,10 @@ data:
# For more details: https://github.com/knative/eventing/issues/5086
kreference-group: "disabled"
+ # ALPHA feature: The delivery-retryafter allows you to use the RetryAfter field in DeliverySpec.
+ # For more details: https://github.com/knative/eventing/issues/5811
+ delivery-retryafter: "disabled"
+
# ALPHA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec.
# For more details: https://github.com/knative/eventing/issues/5148
delivery-timeout: "disabled"
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index d4e35db0acb..4c84a143474 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -392,6 +392,31 @@ More information on Duration format:
For exponential policy, backoff delay is backoffDelay*2^.
+
+
+retryAfterMax
+
+string
+
+ |
+
+(Optional)
+ RetryAfterMax provides an optional upper bound on the duration specified in a “Retry-After” header
+when calculating backoff times for retrying 429 and 503 response codes. Setting the value to
+zero (“PT0S”) can be used to opt-out of respecting “Retry-After” header values altogether. This
+value only takes effect if “Retry” is configured, and also depends on specific implementations
+(Channels, Sources, etc.) choosing to provide this capability.
+Note: This API is EXPERIMENTAL and might be changed at anytime. While this experimental
+feature is in the Alpha/Beta stage, you must provide a valid value to opt-in for
+supporting “Retry-After” headers. When the feature becomes Stable/GA “Retry-After”
+headers will be respected by default, and you can choose to specify “PT0S” to
+opt-out of supporting “Retry-After” headers.
+For more details: https://github.com/knative/eventing/issues/5811
+More information on Duration format:
+- https://www.iso.org/iso-8601-date-and-time-format.html
+- https://en.wikipedia.org/wiki/ISO_8601
+ |
+
DeliveryStatus
diff --git a/pkg/apis/duck/v1/delivery_types.go b/pkg/apis/duck/v1/delivery_types.go
index dffd0397a33..49c26ced5d5 100644
--- a/pkg/apis/duck/v1/delivery_types.go
+++ b/pkg/apis/duck/v1/delivery_types.go
@@ -20,9 +20,10 @@ import (
"context"
"github.com/rickb777/date/period"
- "knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
+
+ "knative.dev/eventing/pkg/apis/feature"
)
// DeliverySpec contains the delivery options for event senders,
@@ -60,6 +61,26 @@ type DeliverySpec struct {
// For exponential policy, backoff delay is backoffDelay*2^.
// +optional
BackoffDelay *string `json:"backoffDelay,omitempty"`
+
+ // RetryAfterMax provides an optional upper bound on the duration specified in a "Retry-After" header
+ // when calculating backoff times for retrying 429 and 503 response codes. Setting the value to
+ // zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This
+ // value only takes effect if "Retry" is configured, and also depends on specific implementations
+ // (Channels, Sources, etc.) choosing to provide this capability.
+ //
+ // Note: This API is EXPERIMENTAL and might be changed at anytime. While this experimental
+ // feature is in the Alpha/Beta stage, you must provide a valid value to opt-in for
+ // supporting "Retry-After" headers. When the feature becomes Stable/GA "Retry-After"
+ // headers will be respected by default, and you can choose to specify "PT0S" to
+ // opt-out of supporting "Retry-After" headers.
+ // For more details: https://github.com/knative/eventing/issues/5811
+ //
+ // More information on Duration format:
+ // - https://www.iso.org/iso-8601-date-and-time-format.html
+ // - https://en.wikipedia.org/wiki/ISO_8601
+ //
+ // +optional
+ RetryAfterMax *string `json:"retryAfterMax,omitempty"`
}
func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError {
@@ -101,6 +122,18 @@ func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError {
errs = errs.Also(apis.ErrInvalidValue(*ds.BackoffDelay, "backoffDelay"))
}
}
+
+ if ds.RetryAfterMax != nil {
+ if feature.FromContext(ctx).IsEnabled(feature.DeliveryRetryAfter) {
+ p, me := period.Parse(*ds.RetryAfterMax)
+ if me != nil || p.IsNegative() {
+ errs = errs.Also(apis.ErrInvalidValue(*ds.RetryAfterMax, "retryAfterMax"))
+ }
+ } else {
+ errs = errs.Also(apis.ErrDisallowedFields("retryAfterMax"))
+ }
+ }
+
return errs
}
diff --git a/pkg/apis/duck/v1/delivery_types_test.go b/pkg/apis/duck/v1/delivery_types_test.go
index ca82ed2b15b..f2b2ab50ae2 100644
--- a/pkg/apis/duck/v1/delivery_types_test.go
+++ b/pkg/apis/duck/v1/delivery_types_test.go
@@ -21,15 +21,19 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/utils/pointer"
- "knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
+
+ "knative.dev/eventing/pkg/apis/feature"
)
func TestDeliverySpecValidation(t *testing.T) {
deliveryTimeoutEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{
feature.DeliveryTimeout: feature.Enabled,
})
+ deliveryRetryAfterEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{
+ feature.DeliveryRetryAfter: feature.Enabled,
+ })
invalidString := "invalid time"
bop := BackoffPolicyExponential
@@ -102,9 +106,41 @@ func TestDeliverySpecValidation(t *testing.T) {
}, {
name: "valid retry 0",
spec: &DeliverySpec{Retry: pointer.Int32Ptr(0)},
+ want: nil,
}, {
name: "valid retry 1",
spec: &DeliverySpec{Retry: pointer.Int32Ptr(1)},
+ want: nil,
+ }, {
+ name: "valid retryAfterMax",
+ ctx: deliveryRetryAfterEnabledCtx,
+ spec: &DeliverySpec{RetryAfterMax: &validDuration},
+ want: nil,
+ }, {
+ name: "zero retryAfterMax",
+ ctx: deliveryRetryAfterEnabledCtx,
+ spec: &DeliverySpec{RetryAfterMax: pointer.StringPtr("PT0S")},
+ want: nil,
+ }, {
+ name: "empty retryAfterMax",
+ ctx: deliveryRetryAfterEnabledCtx,
+ spec: &DeliverySpec{RetryAfterMax: pointer.StringPtr("")},
+ want: func() *apis.FieldError {
+ return apis.ErrInvalidValue("", "retryAfterMax")
+ }(),
+ }, {
+ name: "invalid retryAfterMax",
+ ctx: deliveryRetryAfterEnabledCtx,
+ spec: &DeliverySpec{RetryAfterMax: &invalidDuration},
+ want: func() *apis.FieldError {
+ return apis.ErrInvalidValue(invalidDuration, "retryAfterMax")
+ }(),
+ }, {
+ name: "disabled feature with retryAfterMax",
+ spec: &DeliverySpec{RetryAfterMax: &validDuration},
+ want: func() *apis.FieldError {
+ return apis.ErrDisallowedFields("retryAfterMax")
+ }(),
}}
for _, test := range tests {
diff --git a/pkg/apis/duck/v1/zz_generated.deepcopy.go b/pkg/apis/duck/v1/zz_generated.deepcopy.go
index c22e2bc3b9c..8e30ee5cb49 100644
--- a/pkg/apis/duck/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/duck/v1/zz_generated.deepcopy.go
@@ -163,6 +163,11 @@ func (in *DeliverySpec) DeepCopyInto(out *DeliverySpec) {
*out = new(string)
**out = **in
}
+ if in.RetryAfterMax != nil {
+ in, out := &in.RetryAfterMax, &out.RetryAfterMax
+ *out = new(string)
+ **out = **in
+ }
return
}
diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go
index a404c8fdc92..e69d71959ac 100644
--- a/pkg/apis/feature/flag_names.go
+++ b/pkg/apis/feature/flag_names.go
@@ -17,9 +17,10 @@ limitations under the License.
package feature
const (
- KReferenceGroup = "kreference-group"
- DeliveryTimeout = "delivery-timeout"
- KReferenceMapping = "kreference-mapping"
- StrictSubscriber = "strict-subscriber"
- NewTriggerFilters = "new-trigger-filters"
+ KReferenceGroup = "kreference-group"
+ DeliveryRetryAfter = "delivery-retryafter"
+ DeliveryTimeout = "delivery-timeout"
+ KReferenceMapping = "kreference-mapping"
+ StrictSubscriber = "strict-subscriber"
+ NewTriggerFilters = "new-trigger-filters"
)
diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go
index 6d223f7da40..2eeecef51f1 100644
--- a/pkg/kncloudevents/message_sender.go
+++ b/pkg/kncloudevents/message_sender.go
@@ -18,12 +18,16 @@ package kncloudevents
import (
"context"
+ "fmt"
nethttp "net/http"
+ "strconv"
"time"
"github.com/hashicorp/go-retryablehttp"
)
+const RetryAfterHeader = "Retry-After"
+
// HTTPMessageSender is a wrapper for an http client that can send cloudevents.Request with retries
type HTTPMessageSender struct {
Client *nethttp.Client
@@ -73,9 +77,7 @@ func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC
RetryWaitMax: defaultRetryWaitMax,
RetryMax: config.RetryMax,
CheckRetry: retryablehttp.CheckRetry(config.CheckRetry),
- Backoff: func(_, _ time.Duration, attemptNum int, resp *nethttp.Response) time.Duration {
- return config.Backoff(attemptNum, resp)
- },
+ Backoff: generateBackoffFn(config),
ErrorHandler: func(resp *nethttp.Response, err error, numTries int) (*nethttp.Response, error) {
return resp, err
},
@@ -88,3 +90,91 @@ func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC
return retryableClient.Do(retryableReq)
}
+
+// generateBackoffFunction returns a valid retryablehttp.Backoff implementation which
+// wraps the provided RetryConfig.Backoff implementation with optional "Retry-After"
+// header support.
+func generateBackoffFn(config *RetryConfig) retryablehttp.Backoff {
+ return func(_, _ time.Duration, attemptNum int, resp *nethttp.Response) time.Duration {
+
+ //
+ // NOTE - The following logic will need to be altered slightly once the "delivery-retryafter"
+ // experimental-feature graduates from Alpha/Beta to Stable/GA. This is according to
+ // plan as described in https://github.com/knative/eventing/issues/5811.
+ //
+ // During the Alpha/Beta stages the ability to respect Retry-After headers is "opt-in"
+ // requiring the DeliverySpec.RetryAfterMax to be populated. The Stable/GA behavior
+ // will be "opt-out" where Retry-After headers are always respected (in the context of
+ // calculating backoff durations for 429 / 503 responses) unless the
+ // DeliverySpec.RetryAfterMax is set to "PT0S".
+ //
+ // While this might seem unnecessarily complex, it achieves the following design goals...
+ // - Does not require an explicit "enabled" flag in the DeliverySpec.
+ // - Does not require implementations calling the message_sender to be aware of experimental-features.
+ // - Does not modify existing Knative CRs with arbitrary default "max" values.
+ //
+ // The intended behavior of RetryConfig.RetryAfterMaxDuration is as follows...
+ //
+ // RetryAfterMaxDuration Alpha/Beta Stable/GA
+ // --------------------- ---------- ---------
+ // nil Do NOT respect Retry-After headers Respect Retry-After headers without Max
+ // 0 Do NOT respect Retry-After headers Do NOT respect Retry-After headers
+ // >0 Respect Retry-After headers with Max Respect Retry-After headers with Max
+ //
+
+ // If Response is 429 / 503, Then Parse Any Retry-After Header Durations & Enforce Optional MaxDuration
+ var retryAfterDuration time.Duration
+ // TODO - Remove this check when experimental-feature moves to Stable/GA to convert behavior from opt-in to opt-out
+ if config.RetryAfterMaxDuration != nil {
+ // TODO - Keep this logic as is (no change required) when experimental-feature is Stable/GA
+ if resp != nil && (resp.StatusCode == nethttp.StatusTooManyRequests || resp.StatusCode == nethttp.StatusServiceUnavailable) {
+ retryAfterDuration = parseRetryAfterDuration(resp)
+ if config.RetryAfterMaxDuration != nil && *config.RetryAfterMaxDuration < retryAfterDuration {
+ retryAfterDuration = *config.RetryAfterMaxDuration
+ }
+ }
+ }
+
+ // Calculate The RetryConfig Backoff Duration
+ backoffDuration := config.Backoff(attemptNum, resp)
+
+ // Return The Larger Of The Two Backoff Durations
+ if retryAfterDuration > backoffDuration {
+ return retryAfterDuration
+ }
+ return backoffDuration
+ }
+}
+
+// parseRetryAfterDuration returns a Duration expressing the amount of time
+// requested to wait by a Retry-After header, or 0 if not present or invalid.
+// According to the spec (https://tools.ietf.org/html/rfc7231#section-7.1.3)
+// the Retry-After Header's value can be one of an HTTP-date or delay-seconds,
+// both of which are supported here.
+func parseRetryAfterDuration(resp *nethttp.Response) (retryAfterDuration time.Duration) {
+
+ // Return 0 Duration If No Response / Headers
+ if resp == nil || resp.Header == nil {
+ return
+ }
+
+ // Return 0 Duration If No Retry-After Header
+ retryAfterString := resp.Header.Get(RetryAfterHeader)
+ if len(retryAfterString) <= 0 {
+ return
+ }
+
+ // Attempt To Parse Retry-After Header As Seconds - Return If Successful
+ retryAfterInt, parseIntErr := strconv.ParseInt(retryAfterString, 10, 64)
+ if parseIntErr == nil {
+ return time.Duration(retryAfterInt) * time.Second
+ }
+
+ // Attempt To Parse Retry-After Header As Timestamp (RFC850 & ANSIC) - Return If Successful
+ retryAfterTime, parseTimeErr := nethttp.ParseTime(retryAfterString)
+ if parseTimeErr != nil {
+ fmt.Printf("failed to parse Retry-After header: ParseInt Error = %v, ParseTime Error = %v\n", parseIntErr, parseTimeErr)
+ return
+ }
+ return time.Until(retryAfterTime)
+}
diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go
index 60d572c28cf..deffa576d54 100644
--- a/pkg/kncloudevents/message_sender_test.go
+++ b/pkg/kncloudevents/message_sender_test.go
@@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
+ "strconv"
"sync/atomic"
"testing"
"time"
@@ -33,6 +34,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/pointer"
+
v1 "knative.dev/eventing/pkg/apis/duck/v1"
)
@@ -194,6 +196,308 @@ func TestHTTPMessageSenderSendWithRetriesWithSingleRequestTimeout(t *testing.T)
require.Equal(t, http.StatusOK, got.StatusCode)
}
+// RetryAfterFormat Enum
+type RetryAfterFormat int
+
+const (
+ None RetryAfterFormat = iota // RetryAfter Format Enum Values
+ Seconds
+ Date
+ Invalid
+
+ retryBackoffDuration = 500 * time.Millisecond // Constant (vs linear/exponential) backoff Duration for simpler validation.
+ paddingDuration = 250 * time.Millisecond // Buffer time to allow test execution to actually send the requests.
+)
+
+// RetryAfterValidationServer wraps a standard HTTP test server with tracking/validation logic.
+type RetryAfterValidationServer struct {
+ *httptest.Server // Wrapped Golang HTTP Test Server.
+ previousReqTime time.Time // Tracking request times to validate retry intervals.
+ requestCount int32 // Tracking total requests for external validation of retry attempts.
+ minRequestDuration time.Duration // Expected minimum request interval duration.
+ maxRequestDuration time.Duration // Expected maximum request interval duration.
+}
+
+// newRetryAfterValidationServer returns a new RetryAfterValidationServer with the
+// specified configuration. The server tracks total request counts and validates
+// inter-request durations to ensure they confirm to the expected backoff behavior.
+func newRetryAfterValidationServer(t *testing.T, statusCode int, retryAfterFormat RetryAfterFormat, retryAfterDuration time.Duration, requestDuration time.Duration) *RetryAfterValidationServer {
+
+ server := &RetryAfterValidationServer{
+ minRequestDuration: requestDuration,
+ maxRequestDuration: requestDuration + paddingDuration,
+ }
+
+ server.Server = httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
+
+ // Determine Time Of Current Request
+ currentReqTime := time.Now()
+ // TODO - Remove this check when experimental-feature moves to Stable/GA to convert behavior from opt-in to opt-out
+ if server.minRequestDuration > retryBackoffDuration {
+ // TODO - Keep this logic as is (no change required) when experimental-feature moves to Stable/GA
+ if retryAfterFormat == Date && retryAfterDuration > 0 {
+ currentReqTime = currentReqTime.Round(time.Second) // Round When Using Date Format To Account For RFC850 Precision
+ }
+ }
+
+ // Count The Requests
+ atomic.AddInt32(&server.requestCount, 1)
+
+ // Add Retry-After Header To Response Based On Configured Format
+ switch retryAfterFormat {
+ case None:
+ // Don't Write Anything ;)
+ case Seconds:
+ writer.Header().Set(RetryAfterHeader, strconv.Itoa(int(retryAfterDuration.Seconds())))
+ case Date:
+ writer.Header().Set(RetryAfterHeader, currentReqTime.Add(retryAfterDuration).Format(time.RFC850)) // RFC850 Drops Millis
+ case Invalid:
+ writer.Header().Set(RetryAfterHeader, "FOO")
+ default:
+ assert.Fail(t, "TestCase with unsupported ResponseFormat '%v'", retryAfterFormat)
+ }
+
+ // Only Validate Timings Of Retries (Skip Initial Request)
+ if server.requestCount > 1 {
+
+ // Validate Inter-Request Durations Meet Or Exceed Expected Minimums
+ actualRequestDuration := currentReqTime.Sub(server.previousReqTime)
+ assert.GreaterOrEqual(t, actualRequestDuration, server.minRequestDuration, "previousReqTime =", server.previousReqTime.String(), "currentReqTime =", currentReqTime.String())
+ assert.LessOrEqual(t, actualRequestDuration, server.maxRequestDuration, "previousReqTime =", server.previousReqTime.String(), "currentReqTime =", currentReqTime.String())
+ t.Logf("Validated Request Duration %s between expected range %s - %s",
+ actualRequestDuration.String(),
+ server.minRequestDuration.String(),
+ server.maxRequestDuration.String())
+ }
+
+ // Respond With StatusCode & Cycle The Times
+ writer.WriteHeader(statusCode)
+ server.previousReqTime = currentReqTime
+ }))
+
+ return server
+}
+
+/*
+ * Test Retry-After Header Enforcement
+ *
+ * This test validates that SendWithRetries() is correctly enforcing
+ * the Retry-After headers based on RetryConfig.RetryAfterMaxDuration.
+ * It does this be creating a test HTTP Server which responds with
+ * the desired StatusCode and Retry-After header. The server also
+ * validates the retry request intervals to ensure they fall within
+ * the expected time window. The timings were chosen as a balance of
+ * stability and test execution speed, but could require adjustment.
+ */
+func TestHTTPMessageSenderSendWithRetriesWithRetryAfter(t *testing.T) {
+
+ // Test Data
+ retryMax := int32(2) // Perform a couple of retries to be sure.
+ smallRetryAfterMaxDuration := 1 * time.Second // Value must exceed retryBackoffDuration while being less than retryAfterDuration so that the retryAfterMax value is used.
+ largeRetryAfterMaxDuration := 10 * time.Second // Value must exceed retryBackoffDuration and retryAfterDuration so that Retry-After header is used.
+
+ // Define The TestCases
+ testCases := []struct {
+ name string
+ statusCode int // HTTP StatusCode which the server should return.
+ retryAfterFormat RetryAfterFormat // Format in which the server should return Retry-After headers.
+ retryAfterDuration time.Duration // Duration of the Retry-After header returned by the server.
+ retryAfterMaxDuration *time.Duration // DeliverySpec RetryAfterMax Duration used to calculate expected retry interval.
+ wantRequestDuration time.Duration // Expected minimum Request interval Duration.
+ }{
+
+ // Nil Max Tests (opt-in / opt-out)
+
+ {
+ name: "default max 429 without Retry-After",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: None,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: nil,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ {
+ name: "default max 429 with Retry-After seconds",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Seconds,
+ retryAfterDuration: 1 * time.Second,
+ retryAfterMaxDuration: nil,
+ wantRequestDuration: retryBackoffDuration, // TODO - Update when experimental-feature moves to Stable/GA
+ },
+ {
+ name: "default max 429 with Retry-After date",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Date,
+ retryAfterDuration: 2 * time.Second,
+ retryAfterMaxDuration: nil,
+ wantRequestDuration: retryBackoffDuration, // TODO - Update when experimental-feature moves to Stable/GA
+ },
+ {
+ name: "default max 429 with invalid Retry-After",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Invalid,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: nil,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ {
+ name: "default max 503 with Retry-After seconds",
+ statusCode: http.StatusServiceUnavailable,
+ retryAfterFormat: Seconds,
+ retryAfterDuration: 1 * time.Second,
+ retryAfterMaxDuration: nil,
+ wantRequestDuration: retryBackoffDuration, // TODO - Update when experimental-feature moves to Stable/GA
+ },
+ {
+ name: "default max 500 without Retry-After",
+ statusCode: http.StatusInternalServerError,
+ retryAfterFormat: None,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: nil,
+ wantRequestDuration: retryBackoffDuration,
+ },
+
+ // Large Max Tests (Greater Than Retry-After Value)
+
+ {
+ name: "large max 429 without Retry-After",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: None,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: &largeRetryAfterMaxDuration,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ {
+ name: "large max 429 with Retry-After seconds",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Seconds,
+ retryAfterDuration: 1 * time.Second,
+ retryAfterMaxDuration: &largeRetryAfterMaxDuration,
+ wantRequestDuration: 1 * time.Second,
+ },
+ {
+ name: "large max 429 with Retry-After date",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Date,
+ retryAfterDuration: 1 * time.Second,
+ retryAfterMaxDuration: &largeRetryAfterMaxDuration,
+ wantRequestDuration: 1 * time.Second,
+ },
+ {
+ name: "large max 429 with invalid Retry-After",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Invalid,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: &largeRetryAfterMaxDuration,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ {
+ name: "large max 503 with Retry-After seconds",
+ statusCode: http.StatusServiceUnavailable,
+ retryAfterFormat: Seconds,
+ retryAfterDuration: 1 * time.Second,
+ retryAfterMaxDuration: &largeRetryAfterMaxDuration,
+ wantRequestDuration: 1 * time.Second,
+ },
+ {
+ name: "large max 500 without Retry-After",
+ statusCode: http.StatusInternalServerError,
+ retryAfterFormat: None,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: &largeRetryAfterMaxDuration,
+ wantRequestDuration: retryBackoffDuration,
+ },
+
+ // Small Max Tests (Less Than Retry-After Value)
+
+ {
+ name: "small max 429 without Retry-After",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: None,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: &smallRetryAfterMaxDuration,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ {
+ name: "small max 429 with Retry-After seconds",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Seconds,
+ retryAfterDuration: 4 * time.Second,
+ retryAfterMaxDuration: &smallRetryAfterMaxDuration,
+ wantRequestDuration: smallRetryAfterMaxDuration,
+ },
+ {
+ name: "small max 429 with Retry-After date",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Date,
+ retryAfterDuration: 2 * time.Second,
+ retryAfterMaxDuration: &smallRetryAfterMaxDuration,
+ wantRequestDuration: smallRetryAfterMaxDuration,
+ },
+ {
+ name: "small max 429 with invalid Retry-After",
+ statusCode: http.StatusTooManyRequests,
+ retryAfterFormat: Invalid,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: &smallRetryAfterMaxDuration,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ {
+ name: "small max 503 with Retry-After seconds",
+ statusCode: http.StatusServiceUnavailable,
+ retryAfterFormat: Seconds,
+ retryAfterDuration: 4 * time.Second,
+ retryAfterMaxDuration: &smallRetryAfterMaxDuration,
+ wantRequestDuration: smallRetryAfterMaxDuration,
+ },
+ {
+ name: "small max 500 without Retry-After",
+ statusCode: http.StatusInternalServerError,
+ retryAfterFormat: None,
+ retryAfterDuration: 0 * time.Second,
+ retryAfterMaxDuration: &smallRetryAfterMaxDuration,
+ wantRequestDuration: retryBackoffDuration,
+ },
+ }
+
+ // Loop Over The TestCases
+ for _, testCase := range testCases {
+
+ // Capture Range Variable For Parallel Execution
+ tc := testCase
+
+ // Execute The Individual TestCase
+ t.Run(tc.name, func(t *testing.T) {
+
+ // Run TestCases In Parallel
+ t.Parallel()
+
+ // Create A RetryAfter Validation Server To Validate Retry Durations
+ server := newRetryAfterValidationServer(t, tc.statusCode, tc.retryAfterFormat, tc.retryAfterDuration, tc.wantRequestDuration)
+
+ // Create RetryConfig With RetryAfterMax From TestCase
+ retryConfig := RetryConfig{
+ RetryMax: int(retryMax),
+ CheckRetry: SelectiveRetry,
+ Backoff: func(attemptNum int, resp *http.Response) time.Duration { return retryBackoffDuration },
+ RetryAfterMaxDuration: tc.retryAfterMaxDuration,
+ }
+
+ // Perform The Test - Generate And Send The Initial Request
+ t.Logf("Testing %d Response With RetryAfter (%d) %fs & Max %+v", tc.statusCode, tc.retryAfterFormat, tc.retryAfterDuration.Seconds(), tc.retryAfterMaxDuration)
+ sender := &HTTPMessageSender{Client: http.DefaultClient}
+ request, err := http.NewRequest("POST", server.URL, nil)
+ assert.Nil(t, err)
+ response, err := sender.SendWithRetries(request, &retryConfig)
+
+ // Verify Final Results (Actual Retry Timing Validated In Server)
+ assert.Nil(t, err)
+ assert.Equal(t, response.StatusCode, tc.statusCode)
+ assert.Equal(t, retryMax+1, atomic.LoadInt32(&server.requestCount))
+ })
+ }
+}
+
func TestRetriesOnNetworkErrors(t *testing.T) {
n := int32(10)
diff --git a/pkg/kncloudevents/retries.go b/pkg/kncloudevents/retries.go
index ee816efa16a..4100077c798 100644
--- a/pkg/kncloudevents/retries.go
+++ b/pkg/kncloudevents/retries.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/rickb777/date/period"
+
v1 "knative.dev/eventing/pkg/apis/duck/v1"
)
@@ -66,6 +67,12 @@ type RetryConfig struct {
// RequestTimeout represents the timeout of the single request
RequestTimeout time.Duration
+
+ // RetryAfterMaxDuration represents an optional override for the maximum
+ // value allowed for "Retry-After" headers in 429 / 503 responses. A nil
+ // value indicates no maximum override. A value of "0" indicates "Retry-After"
+ // headers are to be ignored.
+ RetryAfterMaxDuration *time.Duration
}
func NoRetries() RetryConfig {
@@ -112,6 +119,15 @@ func RetryConfigFromDeliverySpec(spec v1.DeliverySpec) (RetryConfig, error) {
retryConfig.RequestTimeout, _ = timeout.Duration()
}
+ if spec.RetryAfterMax != nil {
+ maxPeriod, err := period.Parse(*spec.RetryAfterMax)
+ if err != nil { // Should never happen based on DeliverySpec validation
+ return retryConfig, fmt.Errorf("failed to parse Spec.RetryAfterMax: %w", err)
+ }
+ maxDuration, _ := maxPeriod.Duration()
+ retryConfig.RetryAfterMaxDuration = &maxDuration
+ }
+
return retryConfig, nil
}
diff --git a/pkg/kncloudevents/retries_test.go b/pkg/kncloudevents/retries_test.go
index 6c2e493829c..fc706d3ac93 100644
--- a/pkg/kncloudevents/retries_test.go
+++ b/pkg/kncloudevents/retries_test.go
@@ -23,19 +23,41 @@ import (
"testing"
"time"
+ "github.com/rickb777/date/period"
"github.com/stretchr/testify/assert"
"k8s.io/utils/pointer"
- v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/ptr"
+
+ v1 "knative.dev/eventing/pkg/apis/duck/v1"
)
+// Test The NoRetries() Functionality
+func TestNoRetries(t *testing.T) {
+ retryConfig := NoRetries()
+ assert.NotNil(t, retryConfig)
+ assert.Equal(t, 0, retryConfig.RetryMax)
+ assert.NotNil(t, retryConfig.CheckRetry)
+ result, err := retryConfig.CheckRetry(context.TODO(), nil, nil)
+ assert.False(t, result)
+ assert.Nil(t, err)
+ assert.NotNil(t, retryConfig.Backoff)
+ assert.Equal(t, time.Duration(0), retryConfig.Backoff(1, nil))
+ assert.Equal(t, time.Duration(0), retryConfig.Backoff(100, nil))
+ assert.Nil(t, retryConfig.RetryAfterMaxDuration)
+}
+
// Test The RetryConfigFromDeliverySpec() Functionality
func TestRetryConfigFromDeliverySpec(t *testing.T) {
const retry = 5
+ validISO8601DurationString := "PT30S"
+ invalidISO8601DurationString := "FOO"
+
testcases := []struct {
name string
backoffPolicy v1.BackoffPolicyType
backoffDelay string
+ timeout *string
+ retryAfterMax *string
expectedBackoffDurations []time.Duration
wantErr bool
}{{
@@ -76,16 +98,55 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) {
backoffPolicy: v1.BackoffPolicyLinear,
backoffDelay: "FOO",
wantErr: true,
+ }, {
+ name: "Valid Timeout",
+ backoffPolicy: v1.BackoffPolicyExponential,
+ backoffDelay: "PT0.5S",
+ timeout: &validISO8601DurationString,
+ expectedBackoffDurations: []time.Duration{
+ 1 * time.Second,
+ 2 * time.Second,
+ 4 * time.Second,
+ 8 * time.Second,
+ 16 * time.Second,
+ },
+ }, {
+ name: "Invalid Timeout",
+ backoffPolicy: v1.BackoffPolicyExponential,
+ backoffDelay: "PT0.5S",
+ timeout: &invalidISO8601DurationString,
+ wantErr: true,
+ }, {
+ name: "Valid RetryAfterMax",
+ backoffPolicy: v1.BackoffPolicyExponential,
+ backoffDelay: "PT0.5S",
+ retryAfterMax: &validISO8601DurationString,
+ expectedBackoffDurations: []time.Duration{
+ 1 * time.Second,
+ 2 * time.Second,
+ 4 * time.Second,
+ 8 * time.Second,
+ 16 * time.Second,
+ },
+ }, {
+ name: "Invalid RetryAfterMax",
+ backoffPolicy: v1.BackoffPolicyExponential,
+ backoffDelay: "PT0.5S",
+ retryAfterMax: &invalidISO8601DurationString,
+ wantErr: true,
}}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
+
// Create The DeliverySpec To Test
deliverySpec := v1.DeliverySpec{
DeadLetterSink: nil,
Retry: ptr.Int32(retry),
BackoffPolicy: &tc.backoffPolicy,
BackoffDelay: &tc.backoffDelay,
+ Timeout: tc.timeout,
+ RetryAfterMax: tc.retryAfterMax,
}
// Create the RetryConfig from the deliverySpec
@@ -95,6 +156,20 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) {
// If successful then validate the retryConfig (Max & Backoff calculations).
if err == nil {
assert.Equal(t, retry, retryConfig.RetryMax)
+ if tc.timeout != nil && *tc.timeout != "" {
+ expectedTimeoutPeriod, _ := period.Parse(*tc.timeout)
+ expectedTimeoutDuration, _ := expectedTimeoutPeriod.Duration()
+ assert.Equal(t, expectedTimeoutDuration, retryConfig.RequestTimeout)
+ }
+
+ if tc.retryAfterMax != nil && *tc.retryAfterMax != "" {
+ expectedMaxPeriod, _ := period.Parse(*tc.retryAfterMax)
+ expectedMaxDuration, _ := expectedMaxPeriod.Duration()
+ assert.Equal(t, expectedMaxDuration, *retryConfig.RetryAfterMaxDuration)
+ } else {
+ assert.Nil(t, retryConfig.RetryAfterMaxDuration)
+ }
+
for i := 1; i < retry; i++ {
expectedBackoffDuration := tc.expectedBackoffDurations[i-1]
actualBackoffDuration := retryConfig.Backoff(i, nil)
diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go
index 6bb503746b7..e4ed3705dd0 100644
--- a/pkg/reconciler/subscription/subscription.go
+++ b/pkg/reconciler/subscription/subscription.go
@@ -514,7 +514,11 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de
},
}
}
- if channel.Spec.Delivery.BackoffDelay != nil || channel.Spec.Delivery.Retry != nil || channel.Spec.Delivery.BackoffPolicy != nil || channel.Spec.Delivery.Timeout != nil {
+ if channel.Spec.Delivery.BackoffDelay != nil ||
+ channel.Spec.Delivery.Retry != nil ||
+ channel.Spec.Delivery.BackoffPolicy != nil ||
+ channel.Spec.Delivery.Timeout != nil ||
+ channel.Spec.Delivery.RetryAfterMax != nil {
if delivery == nil {
delivery = &eventingduckv1.DeliverySpec{}
}
@@ -522,6 +526,7 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de
delivery.Retry = channel.Spec.Delivery.Retry
delivery.BackoffDelay = channel.Spec.Delivery.BackoffDelay
delivery.Timeout = channel.Spec.Delivery.Timeout
+ delivery.RetryAfterMax = channel.Spec.Delivery.RetryAfterMax
}
return
}
@@ -535,7 +540,12 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de
},
}
}
- if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil || sub.Spec.Delivery.Timeout != nil) {
+ if sub.Spec.Delivery != nil &&
+ (sub.Spec.Delivery.BackoffDelay != nil ||
+ sub.Spec.Delivery.Retry != nil ||
+ sub.Spec.Delivery.BackoffPolicy != nil ||
+ sub.Spec.Delivery.Timeout != nil ||
+ sub.Spec.Delivery.RetryAfterMax != nil) {
if delivery == nil {
delivery = &eventingduckv1.DeliverySpec{}
}
@@ -543,6 +553,7 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de
delivery.Retry = sub.Spec.Delivery.Retry
delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay
delivery.Timeout = sub.Spec.Delivery.Timeout
+ delivery.RetryAfterMax = sub.Spec.Delivery.RetryAfterMax
}
return
}
diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go
index d4578c6b2f4..af5bbc68838 100644
--- a/pkg/reconciler/subscription/subscription_test.go
+++ b/pkg/reconciler/subscription/subscription_test.go
@@ -22,40 +22,37 @@ import (
"fmt"
"testing"
- apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
- "k8s.io/utils/pointer"
- "knative.dev/eventing/pkg/apis/feature"
- "knative.dev/pkg/injection/clients/dynamicclient"
- "knative.dev/pkg/kref"
- "knative.dev/pkg/network"
- "knative.dev/pkg/tracker"
-
- eventingclient "knative.dev/eventing/pkg/client/injection/client"
- "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
-
corev1 "k8s.io/api/core/v1"
+ apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgotesting "k8s.io/client-go/testing"
+ "k8s.io/utils/pointer"
+
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
+ "knative.dev/pkg/injection/clients/dynamicclient"
+ "knative.dev/pkg/kref"
logtesting "knative.dev/pkg/logging/testing"
+ "knative.dev/pkg/network"
+ . "knative.dev/pkg/reconciler/testing"
"knative.dev/pkg/resolver"
+ "knative.dev/pkg/tracker"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
+ "knative.dev/eventing/pkg/apis/feature"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
+ eventingclient "knative.dev/eventing/pkg/client/injection/client"
+ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
+ _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
+ _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
"knative.dev/eventing/pkg/duck"
eventingtesting "knative.dev/eventing/pkg/reconciler/testing"
-
- . "knative.dev/pkg/reconciler/testing"
-
- _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
- _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
)
@@ -1492,6 +1489,66 @@ func TestAllCases(t *testing.T) {
patchFinalizers(testNS, "a-"+subscriptionName),
},
},
+ {
+ Name: "v1 imc - delivery defaulting - optional features",
+ Ctx: feature.ToContext(context.TODO(), feature.Flags{
+ feature.DeliveryTimeout: feature.Enabled,
+ feature.DeliveryRetryAfter: feature.Enabled,
+ }),
+ Objects: []runtime.Object{
+ NewSubscription("a-"+subscriptionName, testNS,
+ WithSubscriptionUID("a-"+subscriptionUID),
+ WithSubscriptionChannel(imcV1GVK, channelName),
+ WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
+ ),
+ NewUnstructured(subscriberGVK, dlcName, testNS,
+ WithUnstructuredAddressable(dlcDNS),
+ ),
+ NewInMemoryChannel(channelName, testNS,
+ WithInitInMemoryChannelConditions,
+ WithInMemoryChannelSubscribers(nil),
+ WithInMemoryChannelAddress(channelDNS),
+ WithInMemoryChannelReadySubscriber("a-"+subscriptionUID),
+ WithInMemoryChannelDelivery(&eventingduck.DeliverySpec{
+ Timeout: pointer.StringPtr("PT1S"),
+ RetryAfterMax: pointer.StringPtr("PT2S"),
+ }),
+ WithInMemoryChannelStatusDLSURI(dlcURI),
+ ),
+ NewService(serviceName, testNS),
+ },
+ Key: testNS + "/" + "a-" + subscriptionName,
+ WantErr: false,
+ WantEvents: []string{
+ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName),
+ Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName),
+ },
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: NewSubscription("a-"+subscriptionName, testNS,
+ WithSubscriptionUID("a-"+subscriptionUID),
+ WithSubscriptionChannel(imcV1GVK, channelName),
+ WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
+ // The first reconciliation will initialize the status conditions.
+ WithInitSubscriptionConditions,
+ MarkReferencesResolved,
+ MarkAddedToChannel,
+ WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI),
+ ),
+ }},
+ WantPatches: []clientgotesting.PatchActionImpl{
+ patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{
+ {
+ UID: "a-" + subscriptionUID,
+ SubscriberURI: serviceURI,
+ Delivery: &eventingduck.DeliverySpec{
+ Timeout: pointer.StringPtr("PT1S"),
+ RetryAfterMax: pointer.StringPtr("PT2S"),
+ },
+ },
+ }),
+ patchFinalizers(testNS, "a-"+subscriptionName),
+ },
+ },
{
Name: "v1 imc - no dls on imc nor subscription",
Objects: []runtime.Object{
@@ -1689,6 +1746,76 @@ func TestAllCases(t *testing.T) {
patchFinalizers(testNS, "a-"+subscriptionName),
},
},
+ {
+ Name: "v1 imc - don't default delivery - optional features",
+ Ctx: feature.ToContext(context.TODO(), feature.Flags{
+ feature.DeliveryTimeout: feature.Enabled,
+ feature.DeliveryRetryAfter: feature.Enabled,
+ }),
+ Objects: []runtime.Object{
+ NewSubscription("a-"+subscriptionName, testNS,
+ WithSubscriptionUID("a-"+subscriptionUID),
+ WithSubscriptionChannel(imcV1GVK, channelName),
+ WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
+ WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{
+ Timeout: pointer.StringPtr("PT1S"),
+ RetryAfterMax: pointer.StringPtr("PT2S"),
+ }),
+ ),
+ NewUnstructured(subscriberGVK, dlsName, testNS,
+ WithUnstructuredAddressable(dlsDNS),
+ ),
+ NewUnstructured(subscriberGVK, dlc2Name, testNS,
+ WithUnstructuredAddressable(dlc2DNS),
+ ),
+ NewInMemoryChannel(channelName, testNS,
+ WithInitInMemoryChannelConditions,
+ WithInMemoryChannelSubscribers(nil),
+ WithInMemoryChannelAddress(channelDNS),
+ WithInMemoryChannelReadySubscriber("a-"+subscriptionUID),
+ WithInMemoryChannelDelivery(&eventingduck.DeliverySpec{
+ Timeout: pointer.StringPtr("PT10S"),
+ RetryAfterMax: pointer.StringPtr("PT20S"),
+ }),
+ ),
+ NewService(serviceName, testNS),
+ },
+ Key: testNS + "/" + "a-" + subscriptionName,
+ WantErr: false,
+ WantEvents: []string{
+ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName),
+ Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName),
+ },
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: NewSubscription("a-"+subscriptionName, testNS,
+ WithSubscriptionUID("a-"+subscriptionUID),
+ WithSubscriptionChannel(imcV1GVK, channelName),
+ WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS),
+ // The first reconciliation will initialize the status conditions.
+ WithInitSubscriptionConditions,
+ MarkReferencesResolved,
+ MarkAddedToChannel,
+ WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI),
+ WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{
+ Timeout: pointer.StringPtr("PT1S"),
+ RetryAfterMax: pointer.StringPtr("PT2S"),
+ }),
+ ),
+ }},
+ WantPatches: []clientgotesting.PatchActionImpl{
+ patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{
+ {
+ UID: "a-" + subscriptionUID,
+ SubscriberURI: serviceURI,
+ Delivery: &eventingduck.DeliverySpec{
+ Timeout: pointer.StringPtr("PT1S"),
+ RetryAfterMax: pointer.StringPtr("PT2S"),
+ },
+ },
+ }),
+ patchFinalizers(testNS, "a-"+subscriptionName),
+ },
+ },
{
Name: "v1 imc+deleted - channel patch succeeded",
Objects: []runtime.Object{
diff --git a/test/experimental/config/features.yaml b/test/experimental/config/features.yaml
index 76b436b94c6..45fb3ecf56f 100644
--- a/test/experimental/config/features.yaml
+++ b/test/experimental/config/features.yaml
@@ -23,5 +23,6 @@ metadata:
knative.dev/config-category: eventing
data:
kreference-group: "enabled"
+ delivery-retryafter: "enabled"
delivery-timeout: "enabled"
strict-subscriber: "disabled"
diff --git a/test/experimental/features/retry_after/channel.go b/test/experimental/features/retry_after/channel.go
new file mode 100644
index 00000000000..93385283166
--- /dev/null
+++ b/test/experimental/features/retry_after/channel.go
@@ -0,0 +1,161 @@
+/*
+Copyright 2021 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package retry_after
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strconv"
+ "testing"
+ "time"
+
+ cetest "github.com/cloudevents/sdk-go/v2/test"
+ "github.com/stretchr/testify/require"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/utils/pointer"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+ "knative.dev/reconciler-test/pkg/environment"
+ "knative.dev/reconciler-test/pkg/eventshub"
+ "knative.dev/reconciler-test/pkg/eventshub/assert"
+ "knative.dev/reconciler-test/pkg/feature"
+ "knative.dev/reconciler-test/pkg/state"
+
+ eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
+ messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
+ eventingclient "knative.dev/eventing/pkg/client/injection/client"
+ "knative.dev/eventing/test/rekt/resources/channel"
+ "knative.dev/eventing/test/rekt/resources/subscription"
+)
+
+const (
+ ChannelNameKey = "ChannelNameKey"
+ SubscriptionNameKey = "SubscriptionNameKey"
+ SenderNameKey = "SenderNameKey"
+ ReceiverNameKey = "ReceiverNameKey"
+ RetryAttemptsKey = "RetryAttemptsKey"
+ RetryAfterSecondsKey = "RetryAfterSecondsKey"
+)
+
+// ConfigureDataPlane creates a Feature which sets up the specified Channel,
+// Subscription and EventsHub Receiver so that it is ready to receive CloudEvents.
+func ConfigureDataPlane(ctx context.Context, t *testing.T) *feature.Feature {
+
+ // Get Component Names From Context
+ var retryAttempts, retryAfterSeconds int
+ channelName := state.GetStringOrFail(ctx, t, ChannelNameKey)
+ subscriptionName := state.GetStringOrFail(ctx, t, SubscriptionNameKey)
+ receiverName := state.GetStringOrFail(ctx, t, ReceiverNameKey)
+ state.GetOrFail(ctx, t, RetryAttemptsKey, &retryAttempts)
+ state.GetOrFail(ctx, t, RetryAfterSecondsKey, &retryAfterSeconds)
+
+ // Create A Feature To Configure The DataPlane (Channel, Subscription, Receiver)
+ f := feature.NewFeatureNamed("Configure Data-Plane")
+ f.Setup("Install An EventsHub Receiver", eventshub.Install(receiverName,
+ eventshub.StartReceiver,
+ eventshub.DropFirstN(uint(retryAttempts)),
+ eventshub.DropEventsResponseCode(http.StatusTooManyRequests),
+ eventshub.DropEventsResponseHeaders(map[string]string{"Retry-After": strconv.Itoa(retryAfterSeconds)})))
+ f.Setup("Install A Channel", channel.Install(channelName))
+ f.Setup("Install A Subscription", installRetryAfterSubscription(channelName, subscriptionName, receiverName, int32(retryAttempts)))
+ f.Assert("Channel Is Ready", channel.IsReady(channelName))
+ f.Assert("Subscription Is Ready", subscription.IsReady(subscriptionName))
+
+ // Return The ConfigureDataPlane Feature
+ return f
+}
+
+// SendEvent creates a Feature which sends a CloudEvents to the specified
+// Channel and verifies the timing of its receipt in the corresponding
+// EventsHub Receiver. It is assumed that the backing Channel / Subscription
+// / Receiver are in place and ready to receive the event.
+func SendEvent(ctx context.Context, t *testing.T) *feature.Feature {
+
+ // Get Component Names From Context
+ var retryAttempts, retryAfterSeconds int
+ channelName := state.GetStringOrFail(ctx, t, ChannelNameKey)
+ senderName := state.GetStringOrFail(ctx, t, SenderNameKey)
+ receiverName := state.GetStringOrFail(ctx, t, ReceiverNameKey)
+ state.GetOrFail(ctx, t, RetryAttemptsKey, &retryAttempts)
+ state.GetOrFail(ctx, t, RetryAfterSecondsKey, &retryAfterSeconds)
+
+ // Create The Base CloudEvent To Send (ID will be set by the EventsHub Sender)
+ event := cetest.FullEvent()
+
+ // Mark The Current Time Plus The Expected Retry-After Duration
+ earliestEventTime := time.Now().Add(time.Duration(retryAfterSeconds) * time.Second)
+
+ // Create A New Feature To Send An Event And Verify Retry-After Duration
+ f := feature.NewFeatureNamed("Send Events")
+ f.Setup("Install An EventsHub Sender", eventshub.Install(senderName, eventshub.StartSenderToResource(channel.GVR(), channelName), eventshub.InputEvent(event)))
+ f.Assert("Events Received", assert.OnStore(receiverName).MatchEvent(cetest.HasId(event.ID())).Exact(retryAttempts+1)) // One Successful Response
+ f.Assert("Event Timing Verified", assert.OnStore(receiverName).Match(receivedAfterEventInfoMatcher(earliestEventTime)).Exact(retryAttempts))
+
+ // Return The SendEvents Feature
+ return f
+}
+
+// installRetryAfterSubscription performs the installation of a Subscription
+// for the specified Channel / Sink in the test namespace. The Subscription
+// is configured to specify the experimental RetryAfter behavior which the
+// standard Subscription resource/yaml does not include.
+func installRetryAfterSubscription(channelName, subscriptionName, sinkName string, retryAttempts int32) func(ctx context.Context, t feature.T) {
+ return func(ctx context.Context, t feature.T) {
+ namespace := environment.FromContext(ctx).Namespace()
+ channelAPIVersion, imcKind := channel.GVK().ToAPIVersionAndKind()
+ backoffPolicy := eventingduckv1.BackoffPolicyLinear
+ retryAfterSubscription := &messagingv1.Subscription{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: subscriptionName,
+ Namespace: namespace,
+ },
+ Spec: messagingv1.SubscriptionSpec{
+ Channel: duckv1.KReference{
+ APIVersion: channelAPIVersion,
+ Kind: imcKind,
+ Name: channelName,
+ },
+ Subscriber: &duckv1.Destination{
+ Ref: &duckv1.KReference{
+ APIVersion: "v1",
+ Kind: "Service",
+ Name: sinkName,
+ },
+ },
+ Delivery: &eventingduckv1.DeliverySpec{
+ Retry: pointer.Int32Ptr(retryAttempts),
+ BackoffPolicy: &backoffPolicy,
+ BackoffDelay: pointer.StringPtr("PT0.5S"),
+ RetryAfterMax: pointer.StringPtr("PT30S"),
+ },
+ },
+ }
+ _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, retryAfterSubscription, metav1.CreateOptions{})
+ require.NoError(t, err)
+ }
+}
+
+// receivedAfterEventInfoMatcher returns an EventInfoMatcher that validates
+// the receipt time of the EventInfo is AFTER the specified minimum time.
+func receivedAfterEventInfoMatcher(earliestTime time.Time) eventshub.EventInfoMatcher {
+ return func(eventInfo eventshub.EventInfo) error {
+ if eventInfo.Time.Before(earliestTime) {
+ return fmt.Errorf("response received prior to Retry-After header duration")
+ }
+ return nil
+ }
+}
diff --git a/test/experimental/retry_after_test.go b/test/experimental/retry_after_test.go
new file mode 100644
index 00000000000..4620073b840
--- /dev/null
+++ b/test/experimental/retry_after_test.go
@@ -0,0 +1,64 @@
+//go:build e2e
+// +build e2e
+
+/*
+Copyright 2021 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package experimental
+
+import (
+ "testing"
+
+ "knative.dev/pkg/system"
+ "knative.dev/reconciler-test/pkg/environment"
+ "knative.dev/reconciler-test/pkg/feature"
+ "knative.dev/reconciler-test/pkg/k8s"
+ "knative.dev/reconciler-test/pkg/knative"
+ "knative.dev/reconciler-test/pkg/state"
+
+ "knative.dev/eventing/test/experimental/features/retry_after"
+)
+
+func TestRetryAfter(t *testing.T) {
+
+ // Run Test In Parallel With Others
+ t.Parallel()
+
+ // Create The Test Context / Environment
+ ctx, env := global.Environment(
+ knative.WithKnativeNamespace(system.Namespace()),
+ knative.WithLoggingConfig,
+ knative.WithTracingConfig,
+ k8s.WithEventListener,
+ environment.Managed(t),
+ )
+
+ // Generate A Unique K8S Safe Prefix For The Test Components
+ retryAfterPrefix := feature.MakeRandomK8sName("retryafter")
+
+ // Generate Unique Component Names And Add To Context Store
+ ctx = state.ContextWith(ctx, &state.KVStore{})
+ state.SetOrFail(ctx, t, retry_after.ChannelNameKey, retryAfterPrefix+"-channel")
+ state.SetOrFail(ctx, t, retry_after.SubscriptionNameKey, retryAfterPrefix+"-subscription")
+ state.SetOrFail(ctx, t, retry_after.SenderNameKey, retryAfterPrefix+"-sender")
+ state.SetOrFail(ctx, t, retry_after.ReceiverNameKey, retryAfterPrefix+"-receiver")
+ state.SetOrFail(ctx, t, retry_after.RetryAttemptsKey, 3)
+ state.SetOrFail(ctx, t, retry_after.RetryAfterSecondsKey, 10)
+
+ // Configure DataPlane & Send An Event
+ env.Test(ctx, t, retry_after.ConfigureDataPlane(ctx, t))
+ env.Test(ctx, t, retry_after.SendEvent(ctx, t))
+}