Skip to content

Commit

Permalink
Add Retry Config to EgressConfig during broker reconciliation (#263)
Browse files Browse the repository at this point in the history
* Add Retry Config to EgressConfig during broker reconciliation

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Add liveness and readiness probes

Probes use the embedded metrics server because we don't have a
server on the dispatcher side.

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Oct 7, 2020
1 parent 85e89e0 commit c0c8682
Show file tree
Hide file tree
Showing 12 changed files with 462 additions and 15 deletions.
2 changes: 2 additions & 0 deletions control-plane/config/500-controller.yaml
Expand Up @@ -86,6 +86,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: BROKER_DEFAULT_BACKOFF_DELAY
value: "PT1S"
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
Expand Down
13 changes: 13 additions & 0 deletions control-plane/pkg/config/env_config.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/kelseyhightower/envconfig"
"github.com/rickb777/date/period"
)

type Env struct {
Expand All @@ -29,6 +30,7 @@ type Env struct {
IngressName string `required:"true" split_words:"true"`
SystemNamespace string `required:"true" split_words:"true"`
DataPlaneConfigFormat string `required:"true" split_words:"true"`
DefaultBackoffDelay string `required:"false" split_words:"true"`
}

func GetEnvConfig(prefix string) (*Env, error) {
Expand All @@ -38,9 +40,20 @@ func GetEnvConfig(prefix string) (*Env, error) {
return nil, fmt.Errorf("failed to process env config: %w", err)
}

if env.DefaultBackoffDelay != "" {
if isNotValidBackoffDelay(env.DefaultBackoffDelay) {
return nil, fmt.Errorf("invalid backoff delay: %s", env.DefaultBackoffDelay)
}
}

return env, nil
}

func (c *Env) DataPlaneConfigMapAsString() string {
return fmt.Sprintf("%s/%s", c.DataPlaneConfigMapNamespace, c.DataPlaneConfigMapName)
}

func isNotValidBackoffDelay(delay string) bool {
_, err := period.Parse(delay)
return err != nil
}
16 changes: 16 additions & 0 deletions control-plane/pkg/config/env_config_test.go
Expand Up @@ -78,6 +78,22 @@ func TestGetEnvConfig(t *testing.T) {
},
wantErr: true,
},
{
name: "invalid backoff delay",
args: args{
prefix: "BROKER",
},
setEnv: func() {
_ = os.Setenv("BROKER_DATA_PLANE_CONFIG_MAP_NAMESPACE", "knative-eventing")
_ = os.Setenv("BROKER_DATA_PLANE_CONFIG_MAP_NAME", "kafka-brokers-triggers")
_ = os.Setenv("BROKER_GENERAL_CONFIG_MAP_NAME", "kafka-config")
_ = os.Setenv("BROKER_INGRESS_NAME", "kafka-broker-ingress")
_ = os.Setenv("BROKER_SYSTEM_NAMESPACE", "knative-eventing")
_ = os.Setenv("BROKER_DATA_PLANE_CONFIG_FORMAT", "json")
_ = os.Setenv("BROKER_DEFAULT_BACKOFF_DELAY", "PTT")
},
wantErr: true,
},
}

for _, tt := range tests {
Expand Down
32 changes: 32 additions & 0 deletions control-plane/pkg/core/config/utils.go
Expand Up @@ -19,6 +19,8 @@ package config
import (
"fmt"

duck "knative.dev/eventing/pkg/apis/duck/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
Expand All @@ -39,3 +41,33 @@ func ContentModeFromString(mode string) contract.ContentMode {
))
}
}

// BackoffPolicyFromString returns the BackoffPolicy from the given string.
//
// Default value is contract.BackoffPolicy_Exponential.
func BackoffPolicyFromString(backoffPolicy *duck.BackoffPolicyType) contract.BackoffPolicy {
if backoffPolicy == nil {
return contract.BackoffPolicy_Exponential
}

bp := *backoffPolicy
switch bp {
case duck.BackoffPolicyLinear:
return contract.BackoffPolicy_Linear
case duck.BackoffPolicyExponential: // The linter complains for missing case in switch
return contract.BackoffPolicy_Exponential
default:
return contract.BackoffPolicy_Exponential
}
}

// BackoffDelayFromString returns the BackoffDelay from the given string.
//
// Default value is the specified defaultDelay.
func BackoffDelayFromString(backoffDelay *string, defaultDelay string) string {
if backoffDelay == nil {
return defaultDelay
}

return *backoffDelay
}
71 changes: 71 additions & 0 deletions control-plane/pkg/core/config/utils_test.go
Expand Up @@ -19,6 +19,9 @@ package config
import (
"testing"

"k8s.io/utils/pointer"
duck "knative.dev/eventing/pkg/apis/duck/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
Expand Down Expand Up @@ -56,3 +59,71 @@ func TestContentModeFromString(t *testing.T) {
})
}
}

func TestBackoffPolicyFromString(t *testing.T) {
linerar := duck.BackoffPolicyLinear
exponential := duck.BackoffPolicyExponential
wrong := duck.BackoffPolicyType("default")
tests := []struct {
name string
backoffPolicy *duck.BackoffPolicyType
want contract.BackoffPolicy
}{
{
name: "nil",
backoffPolicy: nil,
want: contract.BackoffPolicy_Exponential,
},
{
name: "exponential",
backoffPolicy: &exponential,
want: contract.BackoffPolicy_Exponential,
},
{
name: "linear",
backoffPolicy: &linerar,
want: contract.BackoffPolicy_Linear,
},
{
name: "default",
backoffPolicy: &wrong,
want: contract.BackoffPolicy_Exponential,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := BackoffPolicyFromString(tt.backoffPolicy); got != tt.want {
t.Errorf("BackoffPolicyFromString() = %v, want %v", got, tt.want)
}
})
}
}

func TestBackoffDelayFromString(t *testing.T) {

tests := []struct {
name string
backoffDelay *string
defaultDelay string
want string
}{
{
name: "happy case",
backoffDelay: pointer.StringPtr("PT2S"),
defaultDelay: "PT1S",
want: "PT2S",
},
{
name: "default",
defaultDelay: "PT1S",
want: "PT1S",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := BackoffDelayFromString(tt.backoffDelay, tt.defaultDelay); got != tt.want {
t.Errorf("BackoffDelayFromString() = %v, want %v", got, tt.want)
}
})
}
}
30 changes: 25 additions & 5 deletions control-plane/pkg/reconciler/broker/broker.go
Expand Up @@ -317,17 +317,37 @@ func (r *Reconciler) getBrokerResource(ctx context.Context, topic string, broker
BootstrapServers: config.GetBootstrapServers(),
}

if broker.Spec.Delivery != nil && broker.Spec.Delivery.DeadLetterSink != nil {
deadLetterSinkURL, err := r.Resolver.URIFromDestinationV1(ctx, *broker.Spec.Delivery.DeadLetterSink, broker)
if err != nil {
return nil, fmt.Errorf("failed to resolve broker.Spec.Deliver.DeadLetterSink: %w", err)
delivery := broker.Spec.Delivery
if delivery != nil {

if delivery.DeadLetterSink != nil {

deadLetterSinkURL, err := r.Resolver.URIFromDestinationV1(ctx, *delivery.DeadLetterSink, broker)
if err != nil {
return nil, fmt.Errorf("failed to resolve broker.Spec.Deliver.DeadLetterSink: %w", err)
}

ensureEgressConfig(res)
res.EgressConfig.DeadLetter = deadLetterSinkURL.String()
}

if delivery.Retry != nil {
ensureEgressConfig(res)
res.EgressConfig.Retry = uint32(*delivery.Retry)
res.EgressConfig.BackoffDelay = coreconfig.BackoffDelayFromString(delivery.BackoffDelay, "PT1S")
res.EgressConfig.BackoffPolicy = coreconfig.BackoffPolicyFromString(delivery.BackoffPolicy)
}
res.EgressConfig = &contract.EgressConfig{DeadLetter: deadLetterSinkURL.String()}
}

return res, nil
}

func ensureEgressConfig(res *contract.Resource) {
if res.EgressConfig == nil {
res.EgressConfig = &contract.EgressConfig{}
}
}

func (r *Reconciler) ConfigMapUpdated(ctx context.Context) func(configMap *corev1.ConfigMap) {

logger := logging.FromContext(ctx).Desugar()
Expand Down

0 comments on commit c0c8682

Please sign in to comment.